This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 33308811399 [FLINK-30989][table] Fix some options don't take effect in
batch mode
33308811399 is described below
commit 333088113993f4607038dae391863b5c30d0bc95
Author: fengli <[email protected]>
AuthorDate: Sun Feb 26 15:59:33 2023 +0800
[FLINK-30989][table] Fix some options don't take effect in batch mode
This closes #22024
(cherry picked from commit b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4)
---
.../nodes/exec/batch/BatchExecHashAggregate.java | 9 +++-
.../plan/nodes/exec/batch/BatchExecHashJoin.java | 10 ++++
.../plan/nodes/exec/batch/BatchExecSort.java | 10 +++-
.../plan/utils/SorMergeJoinOperatorUtil.java | 16 ++++++
.../planner/codegen/LongHashJoinGenerator.scala | 5 +-
.../codegen/agg/batch/HashAggCodeGenHelper.scala | 17 ++++--
.../codegen/agg/batch/HashAggCodeGenerator.scala | 11 ++--
.../codegen/LongAdaptiveHashJoinGeneratorTest.java | 6 +++
.../agg/batch/HashAggCodeGeneratorTest.scala | 7 ++-
.../runtime/hashtable/BaseHybridHashTable.java | 23 +++-----
.../table/runtime/hashtable/BinaryHashTable.java | 11 ++--
.../table/runtime/hashtable/LongHashPartition.java | 4 +-
.../runtime/hashtable/LongHybridHashTable.java | 13 ++---
.../runtime/operators/join/HashJoinOperator.java | 13 ++++-
.../operators/join/SortMergeJoinFunction.java | 23 ++++++--
.../sort/AbstractBinaryExternalMerger.java | 10 ++--
.../operators/sort/BinaryExternalMerger.java | 4 +-
.../operators/sort/BinaryExternalSorter.java | 46 ++++++++--------
.../operators/sort/BinaryInMemorySortBuffer.java | 4 +-
.../operators/sort/BinaryKVExternalMerger.java | 4 +-
.../operators/sort/BinaryKVInMemorySortBuffer.java | 3 +-
.../operators/sort/BufferedKVExternalSorter.java | 28 ++++------
.../table/runtime/operators/sort/SortOperator.java | 21 +++++++-
.../flink/table/runtime/util/FileChannelUtil.java | 8 +--
.../runtime/hashtable/BinaryHashTableTest.java | 25 ++++-----
.../table/runtime/hashtable/LongHashTableTest.java | 10 ++--
.../runtime/operators/aggregate/HashAggTest.java | 6 ---
.../aggregate/SumHashAggTestOperator.java | 16 +++---
.../join/Int2HashJoinOperatorTestBase.java | 23 ++++++++
.../join/Int2SortMergeJoinOperatorTest.java | 16 ++++++
.../join/String2HashJoinOperatorTest.java | 19 +++++++
.../join/String2SortMergeJoinOperatorTest.java | 16 ++++++
.../operators/sort/BinaryExternalSorterTest.java | 63 +++++++++++++++++++---
.../sort/BufferedKVExternalSorterTest.java | 8 ++-
34 files changed, 367 insertions(+), 141 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
index 27d672706ed..49ea262626a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
@@ -139,7 +139,14 @@ public class BatchExecHashAggregate extends
ExecNodeBase<RowData>
auxGrouping,
isMerge,
isFinal,
- supportAdaptiveLocalHashAgg);
+ supportAdaptiveLocalHashAgg,
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ config.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes());
}
return ExecNodeUtil.createOneInputTransformation(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
index d390292a901..ef178487eff 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
@@ -213,6 +213,12 @@ public class BatchExecHashJoin extends
ExecNodeBase<RowData>
condFunc,
1.0 * externalBufferMemory / managedMemory);
+ boolean compressionEnabled =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+ int compressionBlockSize =
+ (int)
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes();
if (LongHashJoinGenerator.support(hashJoinType, keyType,
joinSpec.getFilterNulls())) {
operator =
LongHashJoinGenerator.gen(
@@ -229,6 +235,8 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
reverseJoin,
condFunc,
leftIsBuild,
+ compressionEnabled,
+ compressionBlockSize,
sortMergeJoinFunction);
} else {
operator =
@@ -236,6 +244,8 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
HashJoinOperator.newHashJoinOperator(
hashJoinType,
leftIsBuild,
+ compressionEnabled,
+ compressionBlockSize,
condFunc,
reverseJoin,
joinSpec.getFilterNulls(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
index a808c86d306..929d6ed60ba 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
@@ -81,7 +81,15 @@ public class BatchExecSort extends ExecNodeBase<RowData>
SortOperator operator =
new SortOperator(
codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
-
codeGen.generateRecordComparator("BatchExecSortComparator"));
+
codeGen.generateRecordComparator("BatchExecSortComparator"),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ config.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
long sortMemory =
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
return ExecNodeUtil.createOneInputTransformation(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
index 634ceb345dd..891aa185786 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/SorMergeJoinOperatorUtil.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.utils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
@@ -48,6 +49,17 @@ public class SorMergeJoinOperatorUtil {
double externalBufferMemRatio) {
int[] keyPositions = IntStream.range(0, leftKeys.length).toArray();
+ int maxNumFileHandles =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES);
+ boolean compressionEnabled =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+ int compressionBlockSize =
+ (int)
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes();
+ boolean asyncMergeEnabled =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED);
+
SortCodeGenerator leftSortGen =
SortUtil.newSortGen(config, classLoader, leftKeys, leftType);
SortCodeGenerator rightSortGen =
@@ -57,6 +69,10 @@ public class SorMergeJoinOperatorUtil {
externalBufferMemRatio,
joinType,
leftIsSmaller,
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled,
condFunc,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(config, classLoader),
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index 9706a3aee55..4949b698206 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -111,6 +111,8 @@ object LongHashJoinGenerator {
reverseJoinFunction: Boolean,
condFunc: GeneratedJoinCondition,
leftIsBuild: Boolean,
+ compressionEnabled: Boolean,
+ compressionBlockSize: Int,
sortMergeJoinFunction: SortMergeJoinFunction):
CodeGenOperatorFactory[RowData] = {
val buildSer = new BinaryRowDataSerializer(buildType.getFieldCount)
@@ -185,7 +187,8 @@ object LongHashJoinGenerator {
|public class $tableTerm extends
${classOf[LongHybridHashTable].getCanonicalName} {
|
| public $tableTerm() {
- | super(getContainingTask().getJobConfiguration(),
getContainingTask(),
+ | super(getContainingTask(),
+ | $compressionEnabled, $compressionBlockSize,
| $buildSerTerm, $probeSerTerm,
| getContainingTask().getEnvironment().getMemoryManager(),
| computeMemorySize(),
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index ac9febcfe6e..55b0e58366d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -585,7 +585,10 @@ object HashAggCodeGenHelper {
outputType: RowType,
outputResultFromMap: String,
sorterTerm: String,
- retryAppend: String): (String, String) = {
+ retryAppend: String,
+ maxNumFileHandles: Int,
+ compressionEnabled: Boolean,
+ compressionBlockSize: Int): (String, String) = {
val (grouping, auxGrouping) = groupingAndAuxGrouping
if (isFinal) {
val logMapSpilling =
@@ -603,7 +606,10 @@ object HashAggCodeGenHelper {
groupKeyRowType,
groupKeyTypesTerm,
aggBufferTypesTerm,
- sorterTerm)
+ sorterTerm,
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize)
val fallbackToSortAggCode = genFallbackToSortAgg(
ctx,
builder,
@@ -711,7 +717,10 @@ object HashAggCodeGenHelper {
groupKeyRowType: RowType,
groupKeyTypesTerm: String,
aggBufferTypesTerm: String,
- sorterTerm: String): String = {
+ sorterTerm: String,
+ maxNumFileHandles: Int,
+ compressionEnabled: Boolean,
+ compressionBlockSize: Int): String = {
val keyComputerTerm = CodeGenUtils.newName("keyComputer")
val recordComparatorTerm = CodeGenUtils.newName("recordComparator")
val prepareSorterCode =
@@ -727,7 +736,7 @@ object HashAggCodeGenHelper {
| new $binaryRowSerializerTypeTerm($aggBufferTypesTerm.length),
| $keyComputerTerm, $recordComparatorTerm,
|
getContainingTask().getEnvironment().getMemoryManager().getPageSize(),
- | getContainingTask().getJobConfiguration()
+ | $maxNumFileHandles, $compressionEnabled, $compressionBlockSize
| );
""".stripMargin
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
index d13fd831522..e9ccb27aef0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
@@ -105,8 +105,10 @@ object HashAggCodeGenerator {
auxGrouping: Array[Int],
isMerge: Boolean,
isFinal: Boolean,
- supportAdaptiveLocalHashAgg: Boolean)
- : GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = {
+ supportAdaptiveLocalHashAgg: Boolean,
+ maxNumFileHandles: Int,
+ compressionEnabled: Boolean,
+ compressionBlockSize: Int):
GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = {
val aggInfos = aggInfoList.aggInfos
val functionIdentifiers = AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
@@ -238,7 +240,10 @@ object HashAggCodeGenerator {
outputType,
outputResultFromMap,
sorterTerm,
- retryAppend
+ retryAppend,
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize
)
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
index b591cca3b43..85516450220 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.codegen;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
@@ -128,6 +129,11 @@ public class LongAdaptiveHashJoinGeneratorTest extends
Int2AdaptiveHashJoinOpera
reverseJoinFunction,
condFunc,
buildLeft,
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue(),
+ (int)
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+ .defaultValue()
+ .getBytes(),
sortMergeJoinFunction);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
index 78e50368396..6b8df49ffda 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.codegen.agg.batch
import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
import
org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.LongAvgAggFunction
import org.apache.flink.table.planner.plan.utils.{AggregateInfo,
AggregateInfoList}
@@ -134,7 +135,11 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
auxGrouping,
isMerge,
isFinal,
- false)
+ false,
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue(),
+ ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue,
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue.getBytes.toInt
+ )
(new CodeGenOperatorFactory[RowData](genOp), iType, oType)
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
index 72dcc28267d..cd71bd9142b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.runtime.hashtable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
@@ -27,7 +26,6 @@ import
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
@@ -127,7 +125,7 @@ public abstract class BaseHybridHashTable implements
MemorySegmentPool {
*/
protected FileIOChannel.Enumerator currentEnumerator;
- protected final boolean compressionEnable;
+ protected final boolean compressionEnabled;
protected final BlockCompressionFactory compressionCodecFactory;
protected final int compressionBlockSize;
@@ -135,27 +133,22 @@ public abstract class BaseHybridHashTable implements
MemorySegmentPool {
protected transient long spillInBytes;
public BaseHybridHashTable(
- Configuration conf,
Object owner,
+ boolean compressionEnabled,
+ int compressionBlockSize,
MemoryManager memManager,
long reservedMemorySize,
IOManager ioManager,
int avgRecordLen,
long buildRowCount,
boolean tryDistinctBuildRow) {
-
- // TODO: read compression config from configuration
- this.compressionEnable =
-
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+ this.compressionEnabled = compressionEnabled;
this.compressionCodecFactory =
- this.compressionEnable
+ this.compressionEnabled
?
BlockCompressionFactory.createBlockCompressionFactory(
BlockCompressionFactory.CompressionFactoryName.LZ4.toString())
: null;
- this.compressionBlockSize =
- (int)
-
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
- .getBytes();
+ this.compressionBlockSize = compressionBlockSize;
this.avgRecordLen = avgRecordLen;
this.buildRowCount = buildRowCount;
this.tryDistinctBuildRow = tryDistinctBuildRow;
@@ -496,7 +489,7 @@ public abstract class BaseHybridHashTable implements
MemorySegmentPool {
ioManager,
id,
retSegments,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
segmentSize);
@@ -517,7 +510,7 @@ public abstract class BaseHybridHashTable implements
MemorySegmentPool {
ioManager,
id,
new LinkedBlockingQueue<>(),
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
segmentSize);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
index 26cfb0bcd9a..b21a00f803b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.runtime.hashtable;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
@@ -143,8 +142,9 @@ public class BinaryHashTable extends BaseHybridHashTable {
BinaryRowData reuseBuildRow;
public BinaryHashTable(
- Configuration conf,
Object owner,
+ boolean compressionEnabled,
+ int compressionBlockSize,
AbstractRowDataSerializer buildSideSerializer,
AbstractRowDataSerializer probeSideSerializer,
Projection<RowData, BinaryRowData> buildSideProjection,
@@ -161,8 +161,9 @@ public class BinaryHashTable extends BaseHybridHashTable {
boolean[] filterNulls,
boolean tryDistinctBuildRow) {
super(
- conf,
owner,
+ compressionEnabled,
+ compressionBlockSize,
memManager,
reservedMemorySize,
ioManager,
@@ -447,7 +448,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
ioManager,
channelWithMeta,
new ArrayList<>(),
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
segmentSize);
@@ -614,7 +615,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
getNotNullNextBuffer(),
this,
this.segmentSize,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize);
area.setPartition(p);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index 306420272bf..a8d9ad9b02a 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -461,7 +461,7 @@ public class LongHashPartition extends
AbstractPagedInputView implements Seekabl
ioAccess,
targetChannel,
bufferReturnQueue,
- longTable.compressionEnable(),
+ longTable.compressionEnabled(),
longTable.compressionCodecFactory(),
longTable.compressionBlockSize(),
segmentSize);
@@ -487,7 +487,7 @@ public class LongHashPartition extends
AbstractPagedInputView implements Seekabl
FileChannelUtil.createOutputView(
ioAccess,
probeChannelEnumerator.next(),
- longTable.compressionEnable(),
+ longTable.compressionEnabled(),
longTable.compressionCodecFactory(),
longTable.compressionBlockSize(),
segmentSize);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
index 75081ea81b7..8ebbb585e47 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/LongHybridHashTable.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.runtime.hashtable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
@@ -70,8 +69,9 @@ public abstract class LongHybridHashTable extends
BaseHybridHashTable {
private LongHashPartition densePartition;
public LongHybridHashTable(
- Configuration conf,
Object owner,
+ boolean compressionEnabled,
+ int compressionBlockSize,
BinaryRowDataSerializer buildSideSerializer,
BinaryRowDataSerializer probeSideSerializer,
MemoryManager memManager,
@@ -80,8 +80,9 @@ public abstract class LongHybridHashTable extends
BaseHybridHashTable {
int avgRecordLen,
long buildRowCount) {
super(
- conf,
owner,
+ compressionEnabled,
+ compressionBlockSize,
memManager,
reservedMemorySize,
ioManager,
@@ -412,7 +413,7 @@ public abstract class LongHybridHashTable extends
BaseHybridHashTable {
ioManager,
channelWithMeta,
new ArrayList<>(),
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
segmentSize);
@@ -658,8 +659,8 @@ public abstract class LongHybridHashTable extends
BaseHybridHashTable {
this.partitionsPendingForSMJ.clear();
}
- public boolean compressionEnable() {
- return compressionEnable;
+ public boolean compressionEnabled() {
+ return compressionEnabled;
}
public BlockCompressionFactory compressionCodecFactory() {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
index 65183a07f03..abdaa22c386 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java
@@ -124,8 +124,9 @@ public abstract class HashJoinOperator extends
TableStreamOperator<RowData>
this.table =
new BinaryHashTable(
- getContainingTask().getJobConfiguration(),
getContainingTask(),
+ parameter.compressionEnabled,
+ parameter.compressionBlockSize,
buildSerializer,
probeSerializer,
parameter.buildProjectionCode.newInstance(cl),
@@ -322,6 +323,8 @@ public abstract class HashJoinOperator extends
TableStreamOperator<RowData>
public static HashJoinOperator newHashJoinOperator(
HashJoinType type,
boolean leftIsBuild,
+ boolean compressionEnable,
+ int compressionBlockSize,
GeneratedJoinCondition condFuncCode,
boolean reverseJoinFunction,
boolean[] filterNullKeys,
@@ -337,6 +340,8 @@ public abstract class HashJoinOperator extends
TableStreamOperator<RowData>
new HashJoinParameter(
type,
leftIsBuild,
+ compressionEnable,
+ compressionBlockSize,
condFuncCode,
reverseJoinFunction,
filterNullKeys,
@@ -372,6 +377,8 @@ public abstract class HashJoinOperator extends
TableStreamOperator<RowData>
static class HashJoinParameter implements Serializable {
HashJoinType type;
boolean leftIsBuild;
+ boolean compressionEnabled;
+ int compressionBlockSize;
GeneratedJoinCondition condFuncCode;
boolean reverseJoinFunction;
boolean[] filterNullKeys;
@@ -387,6 +394,8 @@ public abstract class HashJoinOperator extends
TableStreamOperator<RowData>
HashJoinParameter(
HashJoinType type,
boolean leftIsBuild,
+ boolean compressionEnabled,
+ int compressionBlockSize,
GeneratedJoinCondition condFuncCode,
boolean reverseJoinFunction,
boolean[] filterNullKeys,
@@ -400,6 +409,8 @@ public abstract class HashJoinOperator extends
TableStreamOperator<RowData>
SortMergeJoinFunction sortMergeJoinFunction) {
this.type = type;
this.leftIsBuild = leftIsBuild;
+ this.compressionEnabled = compressionEnabled;
+ this.compressionBlockSize = compressionBlockSize;
this.condFuncCode = condFuncCode;
this.reverseJoinFunction = reverseJoinFunction;
this.filterNullKeys = filterNullKeys;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
index 616f9ebb71c..aa5bed09320 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinFunction.java
@@ -59,6 +59,10 @@ public class SortMergeJoinFunction implements Serializable {
private final FlinkJoinType type;
private final boolean leftIsSmaller;
private final boolean[] filterNulls;
+ private final int maxNumFileHandles;
+ private final boolean compressionEnabled;
+ private final int compressionBlockSize;
+ private final boolean asyncMergeEnabled;
// generated code to cook
private GeneratedJoinCondition condFuncCode;
@@ -93,6 +97,10 @@ public class SortMergeJoinFunction implements Serializable {
double externalBufferMemRatio,
FlinkJoinType type,
boolean leftIsSmaller,
+ int maxNumFileHandles,
+ boolean compressionEnabled,
+ int compressionBlockSize,
+ boolean asyncMergeEnabled,
GeneratedJoinCondition condFuncCode,
GeneratedProjection projectionCode1,
GeneratedProjection projectionCode2,
@@ -105,6 +113,10 @@ public class SortMergeJoinFunction implements Serializable
{
this.externalBufferMemRatio = externalBufferMemRatio;
this.type = type;
this.leftIsSmaller = leftIsSmaller;
+ this.maxNumFileHandles = maxNumFileHandles;
+ this.compressionEnabled = compressionEnabled;
+ this.compressionBlockSize = compressionBlockSize;
+ this.asyncMergeEnabled = asyncMergeEnabled;
this.condFuncCode = condFuncCode;
this.projectionCode1 = projectionCode1;
this.projectionCode2 = projectionCode2;
@@ -159,7 +171,6 @@ public class SortMergeJoinFunction implements Serializable {
+ ", please increase manage memory of task
manager.");
}
- Configuration conf = taskContainer.getJobConfiguration();
// sorter1
this.sorter1 =
new BinaryExternalSorter(
@@ -171,7 +182,10 @@ public class SortMergeJoinFunction implements Serializable
{
serializer1,
computer1.newInstance(cl),
comparator1.newInstance(cl),
- conf);
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled);
this.sorter1.startThreads();
// sorter2
@@ -185,7 +199,10 @@ public class SortMergeJoinFunction implements Serializable
{
serializer2,
computer2.newInstance(cl),
comparator2.newInstance(cl),
- conf);
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled);
this.sorter2.startThreads();
keyComparator = genKeyComparator.newInstance(cl);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
index 33faa8bf72b..22185c278b5 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/AbstractBinaryExternalMerger.java
@@ -51,7 +51,7 @@ public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
private final int maxFanIn;
private final SpillChannelManager channelManager;
- private final boolean compressionEnable;
+ private final boolean compressionEnabled;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
@@ -63,14 +63,14 @@ public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
int pageSize,
int maxFanIn,
SpillChannelManager channelManager,
- boolean compressionEnable,
+ boolean compressionEnabled,
BlockCompressionFactory compressionCodecFactory,
int compressionBlockSize) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.maxFanIn = maxFanIn;
this.channelManager = channelManager;
- this.compressionEnable = compressionEnable;
+ this.compressionEnabled = compressionEnabled;
this.compressionCodecFactory = compressionCodecFactory;
this.compressionBlockSize = compressionBlockSize;
}
@@ -102,7 +102,7 @@ public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
ioManager,
channel,
openChannels,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
pageSize);
@@ -185,7 +185,7 @@ public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
FileChannelUtil.createOutputView(
ioManager,
mergedChannelID,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
pageSize);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
index b68f936cab7..74467c5c761 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalMerger.java
@@ -46,7 +46,7 @@ public class BinaryExternalMerger extends
AbstractBinaryExternalMerger<BinaryRow
SpillChannelManager channelManager,
BinaryRowDataSerializer serializer,
RecordComparator comparator,
- boolean compressionEnable,
+ boolean compressionEnabled,
BlockCompressionFactory compressionCodecFactory,
int compressionBlockSize) {
super(
@@ -54,7 +54,7 @@ public class BinaryExternalMerger extends
AbstractBinaryExternalMerger<BinaryRow
pageSize,
maxFanIn,
channelManager,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize);
this.serializer = serializer;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
index 149025ebd43..09da136143a 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.operators.sort;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AlgorithmOptions;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
@@ -31,7 +30,6 @@ import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
@@ -152,11 +150,11 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
private final int memorySegmentSize;
- private final boolean compressionEnable;
+ private final boolean compressionEnabled;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
- private final boolean asyncMergeEnable;
+ private final boolean asyncMergeEnabled;
// ------------------------------------------------------------------------
// Constructor & Shutdown
@@ -178,7 +176,10 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
BinaryRowDataSerializer serializer,
NormalizedKeyComputer normalizedKeyComputer,
RecordComparator comparator,
- Configuration conf) {
+ int maxNumFileHandles,
+ boolean compressionEnabled,
+ int compressionBlockSize,
+ boolean asyncMergeEnabled) {
this(
owner,
memoryManager,
@@ -188,7 +189,10 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
serializer,
normalizedKeyComputer,
comparator,
- conf,
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled,
AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue());
}
@@ -201,23 +205,19 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
BinaryRowDataSerializer serializer,
NormalizedKeyComputer normalizedKeyComputer,
RecordComparator comparator,
- Configuration conf,
+ int maxNumFileHandles,
+ boolean compressionEnabled,
+ int compressionBlockSize,
+ boolean asyncMergeEnabled,
float startSpillingFraction) {
- int maxNumFileHandles =
-
conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES);
- this.compressionEnable =
-
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+ this.compressionEnabled = compressionEnabled;
this.compressionCodecFactory =
- this.compressionEnable
+ this.compressionEnabled
?
BlockCompressionFactory.createBlockCompressionFactory(
BlockCompressionFactory.CompressionFactoryName.LZ4.toString())
: null;
- this.compressionBlockSize =
- (int)
-
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
- .getBytes();
- asyncMergeEnable =
-
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED);
+ this.compressionBlockSize = compressionBlockSize;
+ this.asyncMergeEnabled = asyncMergeEnabled;
checkArgument(maxNumFileHandles >= 2);
checkNotNull(ioManager);
@@ -256,8 +256,8 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
+ "maxNumFileHandles({}), compressionEnable({}),
compressionCodecFactory({}), compressionBlockSize({}).",
sortMemPages,
maxNumFileHandles,
- compressionEnable,
- compressionEnable ? compressionCodecFactory.getClass() : null,
+ compressionEnabled,
+ compressionEnabled ? compressionCodecFactory.getClass() : null,
compressionBlockSize);
this.sortBuffers = new ArrayList<>();
@@ -314,7 +314,7 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
channelManager,
(BinaryRowDataSerializer) serializer.duplicate(),
comparator,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize);
@@ -1010,7 +1010,7 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
FileChannelUtil.createOutputView(
ioManager,
channel,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
memorySegmentSize);
@@ -1126,7 +1126,7 @@ public class BinaryExternalSorter implements
Sorter<BinaryRowData> {
spillChannelIDs.add(channelID);
// if async merge is disabled, we will only do the final merge
// otherwise we wait for `maxFanIn` number of channels to
begin a merge
- if (!asyncMergeEnable || spillChannelIDs.size() < maxFanIn) {
+ if (!asyncMergeEnabled || spillChannelIDs.size() < maxFanIn) {
continue;
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
index bdd009051af..ca6f65e45fb 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryInMemorySortBuffer.java
@@ -40,7 +40,7 @@ public final class BinaryInMemorySortBuffer extends
BinaryIndexedSortable {
private static final int MIN_REQUIRED_BUFFERS = 3;
- private AbstractRowDataSerializer<RowData> inputSerializer;
+ private final AbstractRowDataSerializer<RowData> inputSerializer;
private final ArrayList<MemorySegment> recordBufferSegments;
private final SimpleCollectingOutputView recordCollector;
private final int totalNumBuffers;
@@ -185,7 +185,7 @@ public final class BinaryInMemorySortBuffer extends
BinaryIndexedSortable {
*
* @return An iterator returning the records in their logical order.
*/
- public final MutableObjectIterator<BinaryRowData> getIterator() {
+ public MutableObjectIterator<BinaryRowData> getIterator() {
return new MutableObjectIterator<BinaryRowData>() {
private final int size = size();
private int current = 0;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
index ed87f8a5048..db98dfd77fe 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVExternalMerger.java
@@ -48,7 +48,7 @@ public class BinaryKVExternalMerger
BinaryRowDataSerializer keySerializer,
BinaryRowDataSerializer valueSerializer,
RecordComparator comparator,
- boolean compressionEnable,
+ boolean compressionEnabled,
BlockCompressionFactory compressionCodecFactory,
int compressionBlockSize) {
super(
@@ -56,7 +56,7 @@ public class BinaryKVExternalMerger
pageSize,
maxFanIn,
channelManager,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize);
this.keySerializer = keySerializer;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
index 4fdf6976d40..38f721e355f 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryKVInMemorySortBuffer.java
@@ -65,8 +65,7 @@ public class BinaryKVInMemorySortBuffer extends
BinaryIndexedSortable {
BinaryRowDataSerializer valueSerializer,
RecordComparator comparator,
ArrayList<MemorySegment> recordBufferSegments,
- MemorySegmentPool memorySegmentPool)
- throws IOException {
+ MemorySegmentPool memorySegmentPool) {
super(
normalizedKeyComputer,
keySerializer,
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
index 02b6aabb8dc..11c252408fe 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.runtime.operators.sort;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
@@ -26,7 +25,6 @@ import
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
@@ -73,14 +71,14 @@ public class BufferedKVExternalSorter {
private final List<ChannelWithMeta> channelIDs = new ArrayList<>();
private final SpillChannelManager channelManager;
- private int pageSize;
+ private final int pageSize;
// metric
private long numSpillFiles;
private long spillInBytes;
private long spillInCompressedBytes;
- private final boolean compressionEnable;
+ private final boolean compressionEnabled;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
@@ -91,27 +89,23 @@ public class BufferedKVExternalSorter {
NormalizedKeyComputer nKeyComputer,
RecordComparator comparator,
int pageSize,
- Configuration conf)
- throws IOException {
+ int maxNumFileHandles,
+ boolean compressionEnabled,
+ int compressionBlockSize) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.nKeyComputer = nKeyComputer;
this.comparator = comparator;
this.pageSize = pageSize;
this.sorter = new QuickSort();
- this.maxNumFileHandles =
-
conf.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES);
- this.compressionEnable =
-
conf.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
+ this.maxNumFileHandles = maxNumFileHandles;
+ this.compressionEnabled = compressionEnabled;
this.compressionCodecFactory =
- this.compressionEnable
+ this.compressionEnabled
?
BlockCompressionFactory.createBlockCompressionFactory(
BlockCompressionFactory.CompressionFactoryName.LZ4.toString())
: null;
- this.compressionBlockSize =
- (int)
-
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
- .getBytes();
+ this.compressionBlockSize = compressionBlockSize;
this.ioManager = ioManager;
this.enumerator = this.ioManager.createChannelEnumerator();
this.channelManager = new SpillChannelManager();
@@ -124,7 +118,7 @@ public class BufferedKVExternalSorter {
keySerializer,
valueSerializer,
comparator,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize);
}
@@ -176,7 +170,7 @@ public class BufferedKVExternalSorter {
FileChannelUtil.createOutputView(
ioManager,
channel,
- compressionEnable,
+ compressionEnabled,
compressionCodecFactory,
compressionBlockSize,
pageSize);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
index c09db2cb073..927c5d97e15 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java
@@ -47,14 +47,28 @@ public class SortOperator extends
TableStreamOperator<RowData>
private GeneratedNormalizedKeyComputer gComputer;
private GeneratedRecordComparator gComparator;
+ private final int maxNumFileHandles;
+ private final boolean compressionEnabled;
+ private final int compressionBlockSize;
+ private final boolean asyncMergeEnabled;
+
private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<RowData> collector;
private transient BinaryRowDataSerializer binarySerializer;
public SortOperator(
- GeneratedNormalizedKeyComputer gComputer,
GeneratedRecordComparator gComparator) {
+ GeneratedNormalizedKeyComputer gComputer,
+ GeneratedRecordComparator gComparator,
+ int maxNumFileHandles,
+ boolean compressionEnabled,
+ int compressionBlockSize,
+ boolean asyncMergeEnabled) {
this.gComputer = gComputer;
this.gComparator = gComparator;
+ this.maxNumFileHandles = maxNumFileHandles;
+ this.compressionEnabled = compressionEnabled;
+ this.compressionBlockSize = compressionBlockSize;
+ this.asyncMergeEnabled = asyncMergeEnabled;
}
@Override
@@ -85,7 +99,10 @@ public class SortOperator extends
TableStreamOperator<RowData>
binarySerializer,
computer,
comparator,
- getContainingTask().getJobConfiguration());
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled);
this.sorter.startThreads();
collector = new StreamRecordCollector<>(output);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
index 9d8ee6d07b9..a8e0da893be 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
@@ -49,12 +49,12 @@ public class FileChannelUtil {
IOManager ioManager,
ChannelWithMeta channel,
List<FileIOChannel> channels,
- boolean compressionEnable,
+ boolean compressionEnabled,
BlockCompressionFactory compressionCodecFactory,
int compressionBlockSize,
int segmentSize)
throws IOException {
- if (compressionEnable) {
+ if (compressionEnabled) {
CompressedHeaderlessChannelReaderInputView in =
new CompressedHeaderlessChannelReaderInputView(
channel.getChannel(),
@@ -82,12 +82,12 @@ public class FileChannelUtil {
public static AbstractChannelWriterOutputView createOutputView(
IOManager ioManager,
FileIOChannel.ID channel,
- boolean compressionEnable,
+ boolean compressionEnabled,
BlockCompressionFactory compressionCodecFactory,
int compressionBlockSize,
int segmentSize)
throws IOException {
- if (compressionEnable) {
+ if (compressionEnabled) {
BufferFileWriter bufferWriter =
ioManager.createBufferFileWriter(channel);
return new CompressedHeaderlessChannelWriterOutputView(
bufferWriter, compressionCodecFactory,
compressionBlockSize);
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index b9868059497..0d4d6b2b748 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.hashtable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -28,7 +27,6 @@ import
org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
@@ -53,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -66,7 +65,6 @@ public class BinaryHashTableTest {
private BinaryRowDataSerializer probeSideSerializer;
private boolean useCompress;
- private Configuration conf;
public BinaryHashTableTest(boolean useCompress) {
this.useCompress = useCompress;
@@ -84,9 +82,6 @@ public class BinaryHashTableTest {
this.probeSideSerializer = new BinaryRowDataSerializer(types.length);
this.ioManager = new IOManagerAsync();
-
- conf = new Configuration();
-
conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED,
useCompress);
}
@After
@@ -736,8 +731,9 @@ public class BinaryHashTableTest {
MemoryManagerBuilder.newBuilder().setMemorySize(96 *
PAGE_SIZE).build();
final BinaryHashTable table =
new BinaryHashTable(
- conf,
new Object(),
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
this.buildSideSerializer,
this.probeSideSerializer,
new MyProjection(),
@@ -836,8 +832,9 @@ public class BinaryHashTableTest {
final BinaryHashTable table =
new BinaryHashTable(
- conf,
new Object(),
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
this.buildSideSerializer,
this.probeSideSerializer,
new MyProjection(),
@@ -901,8 +898,9 @@ public class BinaryHashTableTest {
// allocate the memory for the HashTable
final BinaryHashTable table =
new BinaryHashTable(
- conf,
new Object(),
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
this.buildSideSerializer,
this.probeSideSerializer,
new MyProjection(),
@@ -1006,8 +1004,9 @@ public class BinaryHashTableTest {
final BinaryHashTable table =
new BinaryHashTable(
- conf,
new Object(),
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
buildSideSerializer,
probeSideSerializer,
new MyProjection(),
@@ -1071,8 +1070,9 @@ public class BinaryHashTableTest {
final BinaryHashTable table =
new BinaryHashTable(
- conf,
new Object(),
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
buildSideSerializer,
probeSideSerializer,
new MyProjection(),
@@ -1169,8 +1169,9 @@ public class BinaryHashTableTest {
long memory,
IOManager ioManager) {
return new BinaryHashTable(
- conf,
new Object(),
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
buildSideSerializer,
probeSideSerializer,
buildSideProjection,
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
index 8ec312ee392..660dcaf06ae 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
@@ -20,14 +20,12 @@ package org.apache.flink.table.runtime.hashtable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
@@ -48,6 +46,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;
@@ -63,7 +62,6 @@ public class LongHashTableTest {
MemoryManagerBuilder.newBuilder().setMemorySize(896 *
PAGE_SIZE).build();
private boolean useCompress;
- private Configuration conf;
public LongHashTableTest(boolean useCompress) {
this.useCompress = useCompress;
@@ -80,17 +78,15 @@ public class LongHashTableTest {
this.buildSideSerializer = new BinaryRowDataSerializer(types.length);
this.probeSideSerializer = new BinaryRowDataSerializer(types.length);
this.ioManager = new IOManagerAsync();
-
- conf = new Configuration();
-
conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED,
useCompress);
}
private class MyHashTable extends LongHybridHashTable {
public MyHashTable(long memorySize) {
super(
- conf,
LongHashTableTest.this,
+ useCompress,
+ (int)
TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue().getBytes(),
buildSideSerializer,
probeSideSerializer,
memManager,
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
index a280e28eebb..fab38dfd62a 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/HashAggTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.runtime.operators.aggregate;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
@@ -80,11 +79,6 @@ public class HashAggTest {
};
}
- @Override
- Configuration getConf() {
- return new Configuration();
- }
-
@Override
public IOManager getIOManager() {
return ioManager;
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
index 88be5ef2f0b..d10b1fdb54d 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
@@ -19,12 +19,12 @@
package org.apache.flink.table.runtime.operators.aggregate;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -119,7 +119,15 @@ public class SumHashAggTestOperator extends
AbstractStreamOperator<RowData>
new IntNormalizedKeyComputer(),
new IntRecordComparator(),
getMemoryManager().getPageSize(),
- getConf());
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
+ .defaultValue(),
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
+ .defaultValue(),
+ (int)
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+ .defaultValue()
+ .getBytes());
}
// sort and spill
sorter.sortAndSpill(
@@ -255,10 +263,6 @@ public class SumHashAggTestOperator extends
AbstractStreamOperator<RowData>
return getContainingTask().getEnvironment().getMemoryManager();
}
- Configuration getConf() {
- return getContainingTask().getJobConfiguration();
- }
-
public IOManager getIOManager() {
return getContainingTask().getEnvironment().getIOManager();
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
index 0988d272dfc..53a8dab724f 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTestBase.java
@@ -27,6 +27,7 @@ import
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
@@ -148,6 +149,18 @@ public abstract class Int2HashJoinOperatorTestBase
implements Serializable {
};
boolean[] filterNulls = new boolean[] {true};
+ int maxNumFileHandles =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+ boolean compressionEnabled =
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+ int compressionBlockSize =
+ (int)
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+ .defaultValue()
+ .getBytes();
+ boolean asyncMergeEnabled =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
+
SortMergeJoinFunction sortMergeJoinFunction;
if (buildLeft) {
sortMergeJoinFunction =
@@ -155,6 +168,10 @@ public abstract class Int2HashJoinOperatorTestBase
implements Serializable {
0,
flinkJoinType,
buildLeft,
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled,
condFuncCode,
buildProjectionCode,
probeProjectionCode,
@@ -170,6 +187,10 @@ public abstract class Int2HashJoinOperatorTestBase
implements Serializable {
0,
flinkJoinType,
buildLeft,
+ maxNumFileHandles,
+ compressionEnabled,
+ compressionBlockSize,
+ asyncMergeEnabled,
condFuncCode,
probeProjectionCode,
buildProjectionCode,
@@ -184,6 +205,8 @@ public abstract class Int2HashJoinOperatorTestBase
implements Serializable {
return HashJoinOperator.newHashJoinOperator(
hashJoinType,
buildLeft,
+ compressionEnabled,
+ compressionBlockSize,
condFuncCode,
reverseJoinFunction,
filterNulls,
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
index 9deb124cb5c..73368b5470c 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/Int2SortMergeJoinOperatorTest.java
@@ -23,6 +23,7 @@ import
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
@@ -215,10 +216,25 @@ public class Int2SortMergeJoinOperatorTest {
}
public static SortMergeJoinFunction getJoinFunction(FlinkJoinType type,
boolean leftIsSmaller) {
+ int maxNumFileHandles =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+ boolean compressionEnable =
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+ int compressionBlockSize =
+ (int)
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+ .defaultValue()
+ .getBytes();
+ boolean asyncMergeEnable =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
return new SortMergeJoinFunction(
0,
type,
leftIsSmaller,
+ maxNumFileHandles,
+ compressionEnable,
+ compressionBlockSize,
+ asyncMergeEnable,
new GeneratedJoinCondition("", "", new Object[0]) {
@Override
public JoinCondition newInstance(ClassLoader classLoader) {
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
index ed6ff982eba..bb75ffd16e5 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
@@ -25,6 +25,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -398,11 +399,27 @@ public class String2HashJoinOperatorTest implements
Serializable {
};
boolean[] filterNulls = new boolean[] {true};
+ int maxNumFileHandles =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+ boolean compressionEnable =
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+ int compressionBlockSize =
+ (int)
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+ .defaultValue()
+ .getBytes();
+ boolean asyncMergeEnable =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
+
SortMergeJoinFunction sortMergeJoinFunction =
new SortMergeJoinFunction(
0,
flinkJoinType,
buildLeft,
+ maxNumFileHandles,
+ compressionEnable,
+ compressionBlockSize,
+ asyncMergeEnable,
condFuncCode,
probeProjectionCode,
buildProjectionCode,
@@ -416,6 +433,8 @@ public class String2HashJoinOperatorTest implements
Serializable {
return HashJoinOperator.newHashJoinOperator(
hashJoinType,
buildLeft,
+ compressionEnable,
+ compressionBlockSize,
condFuncCode,
reverseJoinFunction,
filterNulls,
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
index cb1ea8357ca..faab30bccd4 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
@@ -185,11 +186,26 @@ public class String2SortMergeJoinOperatorTest {
}
static StreamOperator newOperator(FlinkJoinType type, boolean
leftIsSmaller) {
+ int maxNumFileHandles =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue();
+ boolean compressionEnable =
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue();
+ int compressionBlockSize =
+ (int)
+
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
+ .defaultValue()
+ .getBytes();
+ boolean asyncMergeEnable =
+
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue();
SortMergeJoinFunction sortMergeJoinFunction =
new SortMergeJoinFunction(
0,
type,
leftIsSmaller,
+ maxNumFileHandles,
+ compressionEnable,
+ compressionBlockSize,
+ asyncMergeEnable,
new GeneratedJoinCondition("", "", new Object[0]) {
@Override
public JoinCondition newInstance(ClassLoader
classLoader) {
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
index 052bf244b05..f88caa5696f 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorterTest.java
@@ -134,7 +134,14 @@ public class BinaryExternalSorterTest {
serializer,
IntNormalizedKeyComputer.INSTANCE,
IntRecordComparator.INSTANCE,
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
1f);
sorter.startThreads();
sorter.write(reader);
@@ -174,7 +181,14 @@ public class BinaryExternalSorterTest {
serializer,
IntNormalizedKeyComputer.INSTANCE,
IntRecordComparator.INSTANCE,
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
0.7f);
sorter.startThreads();
sorter.write(reader);
@@ -215,7 +229,14 @@ public class BinaryExternalSorterTest {
}
},
IntRecordComparator.INSTANCE,
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
0.7f);
sorter.startThreads();
sorter.write(new MockBinaryRowReader(size));
@@ -257,7 +278,14 @@ public class BinaryExternalSorterTest {
serializer,
IntNormalizedKeyComputer.INSTANCE,
IntRecordComparator.INSTANCE,
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
0.7f);
sorter.startThreads();
sorter.write(reader);
@@ -305,7 +333,14 @@ public class BinaryExternalSorterTest {
return -super.compare(o1, o2);
}
},
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
0.7f);
sorter.startThreads();
sorter.write(reader);
@@ -351,7 +386,14 @@ public class BinaryExternalSorterTest {
serializer,
IntNormalizedKeyComputer.INSTANCE,
IntRecordComparator.INSTANCE,
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
0.7f);
sorter.startThreads();
sorter.write(reader);
@@ -389,7 +431,14 @@ public class BinaryExternalSorterTest {
serializer,
IntNormalizedKeyComputer.INSTANCE,
IntRecordComparator.INSTANCE,
- conf,
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes(),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED),
0.7f);
sorter.startThreads();
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
index 95f5bf922ab..484310610ad 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorterTest.java
@@ -115,7 +115,13 @@ public class BufferedKVExternalSorterTest {
computer,
comparator,
PAGE_SIZE,
- conf);
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES),
+
conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED),
+ (int)
+ conf.get(
+ ExecutionConfigOptions
+
.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)
+ .getBytes());
TestMemorySegmentPool pool = new TestMemorySegmentPool(PAGE_SIZE);
List<Integer> expected = new ArrayList<>();
for (int i = 0; i < spillNumber; i++) {