This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 004e374b3d56954c1303bfc7b4d2c51a3158a257 Author: Xintong Song <[email protected]> AuthorDate: Thu Jan 7 11:20:02 2021 +0800 [FLINK-20860][core] Rename ManagedMemoryUseCase#BATCH_OP to OPERATOR. --- .../flink/core/memory/ManagedMemoryUseCase.java | 2 +- .../apache/flink/api/dag/TransformationTest.java | 10 ++++---- .../util/config/memory/ManagedMemoryUtils.java | 2 +- .../util/config/memory/ManagedMemoryUtilsTest.java | 10 ++++---- .../io/StreamMultipleInputProcessorFactory.java | 2 +- .../runtime/io/StreamTwoInputProcessorFactory.java | 2 +- .../runtime/tasks/OneInputStreamTask.java | 2 +- .../runtime/translators/BatchExecutionUtils.java | 2 +- .../api/graph/StreamGraphGeneratorTest.java | 4 ++-- .../api/graph/StreamingJobGraphGeneratorTest.java | 8 +++---- .../plan/nodes/exec/utils/ExecNodeUtil.java | 2 +- .../codegen/agg/batch/BatchAggTestBase.scala | 2 +- .../runtime/operators/TableStreamOperator.java | 2 +- .../BatchMultipleInputStreamOperator.java | 4 ++-- .../TableOperatorWrapperGenerator.java | 6 ++--- .../operators/join/Int2HashJoinOperatorTest.java | 2 +- .../join/RandomSortMergeInnerJoinTest.java | 2 +- .../join/String2HashJoinOperatorTest.java | 2 +- .../join/String2SortMergeJoinOperatorTest.java | 2 +- .../TableOperatorWrapperGeneratorTest.java | 28 +++++++++++----------- .../over/BufferDataOverWindowOperatorTest.java | 2 +- 21 files changed, 49 insertions(+), 49 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java b/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java index eb0e157..f1705db 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java @@ -24,7 +24,7 @@ import org.apache.flink.util.Preconditions; /** Use cases of managed memory. */ @Internal public enum ManagedMemoryUseCase { - BATCH_OP(Scope.OPERATOR), + OPERATOR(Scope.OPERATOR), STATE_BACKEND(Scope.SLOT), PYTHON(Scope.SLOT); diff --git a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java index 5adf33c..494fc9d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java @@ -45,12 +45,12 @@ public class TransformationTest extends TestLogger { @Test public void testDeclareManagedMemoryUseCase() { transformation.declareManagedMemoryUseCaseAtOperatorScope( - ManagedMemoryUseCase.BATCH_OP, 123); + ManagedMemoryUseCase.OPERATOR, 123); transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.STATE_BACKEND); assertThat( transformation .getManagedMemoryOperatorScopeUseCaseWeights() - .get(ManagedMemoryUseCase.BATCH_OP), + .get(ManagedMemoryUseCase.OPERATOR), is(123)); assertThat( transformation.getManagedMemorySlotScopeUseCases(), @@ -64,18 +64,18 @@ public class TransformationTest extends TestLogger { @Test(expected = IllegalArgumentException.class) public void testDeclareManagedMemoryOperatorScopeUseCaseFailZeroWeight() { - transformation.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 0); + transformation.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 0); } @Test(expected = IllegalArgumentException.class) public void testDeclareManagedMemoryOperatorScopeUseCaseFailNegativeWeight() { transformation.declareManagedMemoryUseCaseAtOperatorScope( - ManagedMemoryUseCase.BATCH_OP, -1); + ManagedMemoryUseCase.OPERATOR, -1); } @Test(expected = IllegalArgumentException.class) public void testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() { - transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.BATCH_OP); + transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.OPERATOR); } /** A test implementation of {@link Transformation}. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java index db4970a..8fe0e03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java @@ -121,7 +121,7 @@ public enum ManagedMemoryUtils { switch (consumer) { case TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC: return Stream.of( - Tuple2.of(ManagedMemoryUseCase.BATCH_OP, weight), + Tuple2.of(ManagedMemoryUseCase.OPERATOR, weight), Tuple2.of(ManagedMemoryUseCase.STATE_BACKEND, weight)); case TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON: return Stream.of( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java index 8e5fa21..650a1c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java @@ -73,7 +73,7 @@ public class ManagedMemoryUtilsTest extends TestLogger { new HashMap<ManagedMemoryUseCase, Integer>() { { put(ManagedMemoryUseCase.STATE_BACKEND, DATA_PROC_WEIGHT); - put(ManagedMemoryUseCase.BATCH_OP, DATA_PROC_WEIGHT); + put(ManagedMemoryUseCase.OPERATOR, DATA_PROC_WEIGHT); put(ManagedMemoryUseCase.PYTHON, PYTHON_WEIGHT); } }; @@ -117,7 +117,7 @@ public class ManagedMemoryUtilsTest extends TestLogger { @Test public void testConvertToFractionOfSlot() { - final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.BATCH_OP; + final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.OPERATOR; final double fractionOfUseCase = 0.3; final double fractionOfSlot = @@ -126,7 +126,7 @@ public class ManagedMemoryUtilsTest extends TestLogger { fractionOfUseCase, new HashSet<ManagedMemoryUseCase>() { { - add(ManagedMemoryUseCase.BATCH_OP); + add(ManagedMemoryUseCase.OPERATOR); add(ManagedMemoryUseCase.PYTHON); } }, @@ -139,7 +139,7 @@ public class ManagedMemoryUtilsTest extends TestLogger { @Test public void testConvertToFractionOfSlotWeightNotConfigured() { - final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.BATCH_OP; + final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.OPERATOR; final double fractionOfUseCase = 0.3; final Configuration config = @@ -157,7 +157,7 @@ public class ManagedMemoryUtilsTest extends TestLogger { fractionOfUseCase, new HashSet<ManagedMemoryUseCase>() { { - add(ManagedMemoryUseCase.BATCH_OP); + add(ManagedMemoryUseCase.OPERATOR); add(ManagedMemoryUseCase.PYTHON); } }, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java index c60c5c7..c1f0fd5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java @@ -165,7 +165,7 @@ public class StreamMultipleInputProcessorFactory { ioManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot( - ManagedMemoryUseCase.BATCH_OP, + ManagedMemoryUseCase.OPERATOR, taskManagerConfig, userClassloader), jobConfig); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java index efdf824..395991d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java @@ -139,7 +139,7 @@ public class StreamTwoInputProcessorFactory { ioManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot( - ManagedMemoryUseCase.BATCH_OP, + ManagedMemoryUseCase.OPERATOR, taskManagerConfig, userClassloader), jobConfig); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 0043536..880036b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -128,7 +128,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO getEnvironment().getIOManager(), getExecutionConfig().isObjectReuseEnabled(), configuration.getManagedMemoryFractionOperatorUseCaseOfSlot( - ManagedMemoryUseCase.BATCH_OP, getTaskConfiguration(), userCodeClassLoader), + ManagedMemoryUseCase.OPERATOR, getTaskConfiguration(), userCodeClassLoader), getJobConfiguration(), this); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java index 7e72bd5..963d0ca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java @@ -61,7 +61,7 @@ class BatchExecutionUtils { node.addInputRequirement(i, inputRequirements[i]); } Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = new HashMap<>(); - operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.BATCH_OP, 1); + operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.OPERATOR, 1); node.setManagedMemoryUseCaseWeights( operatorScopeUseCaseWeights, Collections.emptySet()); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 14b6e0b..c459bd8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -508,7 +508,7 @@ public class StreamGraphGeneratorTest extends TestLogger { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final DataStream<Integer> source = env.fromElements(1, 2, 3).name("source"); source.getTransformation() - .declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, weight); + .declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, weight); source.print().name("sink"); final StreamGraph streamGraph = env.getStreamGraph(); @@ -517,7 +517,7 @@ public class StreamGraphGeneratorTest extends TestLogger { assertThat( streamNode .getManagedMemoryOperatorScopeUseCaseWeights() - .get(ManagedMemoryUseCase.BATCH_OP), + .get(ManagedMemoryUseCase.OPERATOR), is(weight)); } else { assertThat(streamNode.getManagedMemoryOperatorScopeUseCaseWeights().size(), is(0)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index ba4f97f..df1dc2b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -982,12 +982,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { // source: batch operatorScopeManagedMemoryUseCaseWeights.add( - Collections.singletonMap(ManagedMemoryUseCase.BATCH_OP, 1)); + Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1)); slotScopeManagedMemoryUseCases.add(Collections.emptySet()); // map1: batch, python operatorScopeManagedMemoryUseCaseWeights.add( - Collections.singletonMap(ManagedMemoryUseCase.BATCH_OP, 1)); + Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1)); slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON)); // map3: python @@ -996,7 +996,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { // map3: batch operatorScopeManagedMemoryUseCaseWeights.add( - Collections.singletonMap(ManagedMemoryUseCase.BATCH_OP, 1)); + Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1)); slotScopeManagedMemoryUseCases.add(Collections.emptySet()); // slotSharingGroup1 contains batch and python use cases: v1(source[batch]) -> map1[batch, @@ -1121,7 +1121,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertEquals( expectedBatchFrac, streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot( - ManagedMemoryUseCase.BATCH_OP, + ManagedMemoryUseCase.OPERATOR, tmConfig, ClassLoader.getSystemClassLoader()), delta); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java index a017266..9e7b158 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java @@ -60,7 +60,7 @@ public class ExecNodeUtil { int memoryKibiBytes = (int) Math.max(1, (memoryBytes >> 10)); Optional<Integer> previousWeight = transformation.declareManagedMemoryUseCaseAtOperatorScope( - ManagedMemoryUseCase.BATCH_OP, memoryKibiBytes); + ManagedMemoryUseCase.OPERATOR, memoryKibiBytes); if (previousWeight.isPresent()) { throw new TableException( "Managed memory weight has been set, this should not happen."); diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala index 57f0b0b..d71a2a3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala @@ -75,7 +75,7 @@ abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) { val streamConfig = testHarness.getStreamConfig streamConfig.setStreamOperatorFactory(args._1) streamConfig.setOperatorID(new OperatorID) - streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, .99) + streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, .99) testHarness.invoke() testHarness.waitForTaskRunning() diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java index eac0cf4..5c92161 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java @@ -54,7 +54,7 @@ public class TableStreamOperator<OUT> extends AbstractStreamOperator<OUT> { .computeMemorySize( getOperatorConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( - ManagedMemoryUseCase.BATCH_OP, + ManagedMemoryUseCase.OPERATOR, environment.getTaskManagerInfo().getConfiguration(), environment.getUserCodeClassLoader().asClassLoader())); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java index e33ddb1b..b9759b8 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java @@ -73,12 +73,12 @@ public class BatchMultipleInputStreamOperator extends MultipleInputStreamOperato multipleInputOperatorParameters .getStreamConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( - ManagedMemoryUseCase.BATCH_OP, + ManagedMemoryUseCase.OPERATOR, taskManagerConfig, getRuntimeContext().getUserCodeClassLoader()) * wrapper.getManagedMemoryFraction(); streamConfig.setManagedMemoryFractionOperatorOfUseCase( - ManagedMemoryUseCase.BATCH_OP, managedMemoryFraction); + ManagedMemoryUseCase.OPERATOR, managedMemoryFraction); return streamConfig; } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java index 895c0f0..a5755dc 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java @@ -175,14 +175,14 @@ public class TableOperatorWrapperGenerator { managedMemoryWeight = transform .getManagedMemoryOperatorScopeUseCaseWeights() - .getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0); + .getOrDefault(ManagedMemoryUseCase.OPERATOR, 0); } else { minResources = minResources.merge(transform.getMinResources()); preferredResources = preferredResources.merge(transform.getPreferredResources()); managedMemoryWeight += transform .getManagedMemoryOperatorScopeUseCaseWeights() - .getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0); + .getOrDefault(ManagedMemoryUseCase.OPERATOR, 0); } } @@ -310,7 +310,7 @@ public class TableOperatorWrapperGenerator { fraction = entry.getKey() .getManagedMemoryOperatorScopeUseCaseWeights() - .getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0) + .getOrDefault(ManagedMemoryUseCase.OPERATOR, 0) * 1.0 / this.managedMemoryWeight; } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java index e429f2f..b2e95b3 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java @@ -348,7 +348,7 @@ public class Int2HashJoinOperatorTest implements Serializable { testHarness.getStreamConfig().setOperatorID(new OperatorID()); testHarness .getStreamConfig() - .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99); + .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99); testHarness.invoke(); testHarness.waitForTaskRunning(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java index 1f006d3..99f93fd 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java @@ -290,7 +290,7 @@ public class RandomSortMergeInnerJoinTest { testHarness.getStreamConfig().setOperatorID(new OperatorID()); testHarness .getStreamConfig() - .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99); + .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99); long initialTime = 0L; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java index 53fa65b..88c06bc 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java @@ -101,7 +101,7 @@ public class String2HashJoinOperatorTest implements Serializable { testHarness.getStreamConfig().setOperatorID(new OperatorID()); testHarness .getStreamConfig() - .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99); + .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99); testHarness.invoke(); testHarness.waitForTaskRunning(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java index 629284c..6a6c506 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java @@ -167,7 +167,7 @@ public class String2SortMergeJoinOperatorTest { testHarness.getStreamConfig().setOperatorID(new OperatorID()); testHarness .getStreamConfig() - .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99); + .setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99); long initialTime = 0L; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java index 1266e0b..6776f08 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java @@ -69,13 +69,13 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { source1, "agg1", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 1); + agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1); OneInputTransformation<RowData, RowData> agg2 = createOneInputTransform( source2, "agg2", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 2); + agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2); TwoInputTransformation<RowData, RowData, RowData> join = createTwoInputTransform( agg1, @@ -85,7 +85,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { RowType.of( DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()))); - join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 3); + join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 3); TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator( @@ -141,14 +141,14 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { source1, "agg1", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 1); + agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1); OneInputTransformation<RowData, RowData> agg2 = createOneInputTransform( source2, "agg2", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 2); + agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2); TwoInputTransformation<RowData, RowData, RowData> join1 = createTwoInputTransform( @@ -159,7 +159,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { RowType.of( DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()))); - join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 3); + join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 3); TwoInputTransformation<RowData, RowData, RowData> join2 = createTwoInputTransform( @@ -171,7 +171,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()))); - join2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 4); + join2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 4); TwoInputTransformation<RowData, RowData, RowData> join3 = createTwoInputTransform( @@ -182,7 +182,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { RowType.of( DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()))); - join3.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 5); + join3.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 5); TwoInputTransformation<RowData, RowData, RowData> join4 = createTwoInputTransform( @@ -196,7 +196,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()))); - join4.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 6); + join4.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 6); TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator( @@ -269,7 +269,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { source4, "agg1", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 1); + agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1); TwoInputTransformation<RowData, RowData, RowData> join1 = createTwoInputTransform( @@ -277,7 +277,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { union2, "join1", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 2); + join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2); UnionTransformation<RowData> union3 = createUnionInputTransform("union3", source5, join1); @@ -343,7 +343,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { source1, "calc1", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - calc1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 1); + calc1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1); calc1.setParallelism(100); OneInputTransformation<RowData, RowData> calc2 = @@ -351,7 +351,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { source2, "calc2", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - calc2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 1); + calc2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1); calc2.setParallelism(50); UnionTransformation<RowData> union = createUnionInputTransform("union1", calc1, calc2); @@ -362,7 +362,7 @@ public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase { source3, "join1", InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType()))); - join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 1); + join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1); join.setParallelism(200); TableOperatorWrapperGenerator generator = diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java index 56bafae..e0dd975 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java @@ -234,7 +234,7 @@ public class BufferDataOverWindowOperatorTest { when(conf.<RowData>getTypeSerializerIn1(getUserCodeClassloader())) .thenReturn(inputSer); when(conf.getManagedMemoryFractionOperatorUseCaseOfSlot( - eq(ManagedMemoryUseCase.BATCH_OP), + eq(ManagedMemoryUseCase.OPERATOR), any(Configuration.class), any(ClassLoader.class))) .thenReturn(0.99);
