Greg Hogan created FLINK-3385:
---------------------------------

             Summary: Fix outer join skipping unprobed partitions
                 Key: FLINK-3385
                 URL: https://issues.apache.org/jira/browse/FLINK-3385
             Project: Flink
          Issue Type: Bug
          Components: Distributed Runtime
            Reporter: Greg Hogan
            Priority: Critical
             Fix For: 1.0.0


{{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
join:

{code}
        public boolean nextRecord() throws IOException {
                if (buildSideOuterJoin) {
                        return processProbeIter() || 
processUnmatchedBuildIter() || prepareNextPartition();
                } else {
                        return processProbeIter() || prepareNextPartition();
                }
        }
{code}

{{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
{{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
partitions:

{code}
                        if (p.isInMemory()) {
                                ...
                        } else {
                                return false;
                        }
{code}

{{MutableHashTable.prepareNextPartition}} calls 
{{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
read and processed in the next instantiation of {{MutableHashTable}}) if 
probe-side records were spilled. In an equi-join this is fine but with an outer 
join the unmatched build-side records must still be retained (though no further 
probing is necessary, so could this be short-circuited when loaded by the next 
{{MutableHashTable}}?).

{code}
                if (isInMemory()) {
                        ...
                }
                else if (this.probeSideRecordCounter == 0) {
                        // partition is empty, no spilled buffers
                        // return the memory buffer
                        
freeMemory.add(this.probeSideBuffer.getCurrentSegment());

                        // delete the spill files
                        this.probeSideChannel.close();
                        this.buildSideChannel.deleteChannel();
                        this.probeSideChannel.deleteChannel();
                        return 0;
                }
                else {
                        // flush the last probe side buffer and register this 
partition as pending
                        this.probeSideBuffer.close();
                        this.probeSideChannel.close();
                        spilledPartitions.add(this);
                        return 1;
                }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to