lsyldliu commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r939466103


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws 
Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close 
the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk 
exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious 
data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws 
Exception {
+        // spill all the in-memory partitions to disk firstly for return the 
memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();

Review Comment:
   I have revert first adaptive hash join strategy due to the concern of 
performance regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to