lsyldliu commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r939466103
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws
Exception {
@Override
public void close() throws Exception {
super.close();
+ closeHashTable();
+ condition.close();
+
+ // If fallback to sort merge join during hash join, also need to close
the operator
+ if (fallbackSMJInBuild || fallbackSMJInProbe) {
+ sortMergeJoinFunction.close();
+ }
+ }
+
+ private void closeHashTable() {
if (this.table != null) {
this.table.close();
this.table.free();
this.table = null;
}
- condition.close();
+ }
+
+ /**
+ * In the process of building a hash table, if the data written to disk
exceeds the threshold,
+ * it means that the build side is larger or there may be a more serious
data skew, so fallback
+ * to sort merge join algorithm to deal with it in advance.
+ */
+ private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws
Exception {
+ // spill all the in-memory partitions to disk firstly for return the
memory which used to
+ // sort
+ this.table.spillAllInMemoryPartition();
Review Comment:
I have revert first adaptive hash join strategy due to the concern of
performance regression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]