This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 004e374b3d56954c1303bfc7b4d2c51a3158a257
Author: Xintong Song <[email protected]>
AuthorDate: Thu Jan 7 11:20:02 2021 +0800

    [FLINK-20860][core] Rename ManagedMemoryUseCase#BATCH_OP to OPERATOR.
---
 .../flink/core/memory/ManagedMemoryUseCase.java    |  2 +-
 .../apache/flink/api/dag/TransformationTest.java   | 10 ++++----
 .../util/config/memory/ManagedMemoryUtils.java     |  2 +-
 .../util/config/memory/ManagedMemoryUtilsTest.java | 10 ++++----
 .../io/StreamMultipleInputProcessorFactory.java    |  2 +-
 .../runtime/io/StreamTwoInputProcessorFactory.java |  2 +-
 .../runtime/tasks/OneInputStreamTask.java          |  2 +-
 .../runtime/translators/BatchExecutionUtils.java   |  2 +-
 .../api/graph/StreamGraphGeneratorTest.java        |  4 ++--
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  8 +++----
 .../plan/nodes/exec/utils/ExecNodeUtil.java        |  2 +-
 .../codegen/agg/batch/BatchAggTestBase.scala       |  2 +-
 .../runtime/operators/TableStreamOperator.java     |  2 +-
 .../BatchMultipleInputStreamOperator.java          |  4 ++--
 .../TableOperatorWrapperGenerator.java             |  6 ++---
 .../operators/join/Int2HashJoinOperatorTest.java   |  2 +-
 .../join/RandomSortMergeInnerJoinTest.java         |  2 +-
 .../join/String2HashJoinOperatorTest.java          |  2 +-
 .../join/String2SortMergeJoinOperatorTest.java     |  2 +-
 .../TableOperatorWrapperGeneratorTest.java         | 28 +++++++++++-----------
 .../over/BufferDataOverWindowOperatorTest.java     |  2 +-
 21 files changed, 49 insertions(+), 49 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java
index eb0e157..f1705db 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ManagedMemoryUseCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.util.Preconditions;
 /** Use cases of managed memory. */
 @Internal
 public enum ManagedMemoryUseCase {
-    BATCH_OP(Scope.OPERATOR),
+    OPERATOR(Scope.OPERATOR),
     STATE_BACKEND(Scope.SLOT),
     PYTHON(Scope.SLOT);
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java 
b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
index 5adf33c..494fc9d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java
@@ -45,12 +45,12 @@ public class TransformationTest extends TestLogger {
     @Test
     public void testDeclareManagedMemoryUseCase() {
         transformation.declareManagedMemoryUseCaseAtOperatorScope(
-                ManagedMemoryUseCase.BATCH_OP, 123);
+                ManagedMemoryUseCase.OPERATOR, 123);
         
transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.STATE_BACKEND);
         assertThat(
                 transformation
                         .getManagedMemoryOperatorScopeUseCaseWeights()
-                        .get(ManagedMemoryUseCase.BATCH_OP),
+                        .get(ManagedMemoryUseCase.OPERATOR),
                 is(123));
         assertThat(
                 transformation.getManagedMemorySlotScopeUseCases(),
@@ -64,18 +64,18 @@ public class TransformationTest extends TestLogger {
 
     @Test(expected = IllegalArgumentException.class)
     public void testDeclareManagedMemoryOperatorScopeUseCaseFailZeroWeight() {
-        
transformation.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP,
 0);
+        
transformation.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR,
 0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void 
testDeclareManagedMemoryOperatorScopeUseCaseFailNegativeWeight() {
         transformation.declareManagedMemoryUseCaseAtOperatorScope(
-                ManagedMemoryUseCase.BATCH_OP, -1);
+                ManagedMemoryUseCase.OPERATOR, -1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() {
-        
transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.BATCH_OP);
+        
transformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.OPERATOR);
     }
 
     /** A test implementation of {@link Transformation}. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
index db4970a..8fe0e03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
@@ -121,7 +121,7 @@ public enum ManagedMemoryUtils {
                             switch (consumer) {
                                 case 
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC:
                                     return Stream.of(
-                                            
Tuple2.of(ManagedMemoryUseCase.BATCH_OP, weight),
+                                            
Tuple2.of(ManagedMemoryUseCase.OPERATOR, weight),
                                             
Tuple2.of(ManagedMemoryUseCase.STATE_BACKEND, weight));
                                 case 
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON:
                                     return Stream.of(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
index 8e5fa21..650a1c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
@@ -73,7 +73,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                 new HashMap<ManagedMemoryUseCase, Integer>() {
                     {
                         put(ManagedMemoryUseCase.STATE_BACKEND, 
DATA_PROC_WEIGHT);
-                        put(ManagedMemoryUseCase.BATCH_OP, DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.OPERATOR, DATA_PROC_WEIGHT);
                         put(ManagedMemoryUseCase.PYTHON, PYTHON_WEIGHT);
                     }
                 };
@@ -117,7 +117,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
 
     @Test
     public void testConvertToFractionOfSlot() {
-        final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.BATCH_OP;
+        final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.OPERATOR;
         final double fractionOfUseCase = 0.3;
 
         final double fractionOfSlot =
@@ -126,7 +126,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         fractionOfUseCase,
                         new HashSet<ManagedMemoryUseCase>() {
                             {
-                                add(ManagedMemoryUseCase.BATCH_OP);
+                                add(ManagedMemoryUseCase.OPERATOR);
                                 add(ManagedMemoryUseCase.PYTHON);
                             }
                         },
@@ -139,7 +139,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
 
     @Test
     public void testConvertToFractionOfSlotWeightNotConfigured() {
-        final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.BATCH_OP;
+        final ManagedMemoryUseCase useCase = ManagedMemoryUseCase.OPERATOR;
         final double fractionOfUseCase = 0.3;
 
         final Configuration config =
@@ -157,7 +157,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         fractionOfUseCase,
                         new HashSet<ManagedMemoryUseCase>() {
                             {
-                                add(ManagedMemoryUseCase.BATCH_OP);
+                                add(ManagedMemoryUseCase.OPERATOR);
                                 add(ManagedMemoryUseCase.PYTHON);
                             }
                         },
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index c60c5c7..c1f0fd5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -165,7 +165,7 @@ public class StreamMultipleInputProcessorFactory {
                             ioManager,
                             executionConfig.isObjectReuseEnabled(),
                             
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
-                                    ManagedMemoryUseCase.BATCH_OP,
+                                    ManagedMemoryUseCase.OPERATOR,
                                     taskManagerConfig,
                                     userClassloader),
                             jobConfig);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index efdf824..395991d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -139,7 +139,7 @@ public class StreamTwoInputProcessorFactory {
                             ioManager,
                             executionConfig.isObjectReuseEnabled(),
                             
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
-                                    ManagedMemoryUseCase.BATCH_OP,
+                                    ManagedMemoryUseCase.OPERATOR,
                                     taskManagerConfig,
                                     userClassloader),
                             jobConfig);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0043536..880036b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -128,7 +128,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                 getEnvironment().getIOManager(),
                 getExecutionConfig().isObjectReuseEnabled(),
                 configuration.getManagedMemoryFractionOperatorUseCaseOfSlot(
-                        ManagedMemoryUseCase.BATCH_OP, getTaskConfiguration(), 
userCodeClassLoader),
+                        ManagedMemoryUseCase.OPERATOR, getTaskConfiguration(), 
userCodeClassLoader),
                 getJobConfiguration(),
                 this);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
index 7e72bd5..963d0ca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
@@ -61,7 +61,7 @@ class BatchExecutionUtils {
                 node.addInputRequirement(i, inputRequirements[i]);
             }
             Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = 
new HashMap<>();
-            operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.BATCH_OP, 1);
+            operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.OPERATOR, 1);
             node.setManagedMemoryUseCaseWeights(
                     operatorScopeUseCaseWeights, Collections.emptySet());
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 14b6e0b..c459bd8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -508,7 +508,7 @@ public class StreamGraphGeneratorTest extends TestLogger {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final DataStream<Integer> source = env.fromElements(1, 2, 
3).name("source");
         source.getTransformation()
-                
.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
weight);
+                
.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
weight);
         source.print().name("sink");
 
         final StreamGraph streamGraph = env.getStreamGraph();
@@ -517,7 +517,7 @@ public class StreamGraphGeneratorTest extends TestLogger {
                 assertThat(
                         streamNode
                                 .getManagedMemoryOperatorScopeUseCaseWeights()
-                                .get(ManagedMemoryUseCase.BATCH_OP),
+                                .get(ManagedMemoryUseCase.OPERATOR),
                         is(weight));
             } else {
                 
assertThat(streamNode.getManagedMemoryOperatorScopeUseCaseWeights().size(), 
is(0));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index ba4f97f..df1dc2b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -982,12 +982,12 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
 
         // source: batch
         operatorScopeManagedMemoryUseCaseWeights.add(
-                Collections.singletonMap(ManagedMemoryUseCase.BATCH_OP, 1));
+                Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
         slotScopeManagedMemoryUseCases.add(Collections.emptySet());
 
         // map1: batch, python
         operatorScopeManagedMemoryUseCaseWeights.add(
-                Collections.singletonMap(ManagedMemoryUseCase.BATCH_OP, 1));
+                Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
         
slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
 
         // map3: python
@@ -996,7 +996,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
 
         // map3: batch
         operatorScopeManagedMemoryUseCaseWeights.add(
-                Collections.singletonMap(ManagedMemoryUseCase.BATCH_OP, 1));
+                Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
         slotScopeManagedMemoryUseCases.add(Collections.emptySet());
 
         // slotSharingGroup1 contains batch and python use cases: 
v1(source[batch]) -> map1[batch,
@@ -1121,7 +1121,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
         assertEquals(
                 expectedBatchFrac,
                 streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
-                        ManagedMemoryUseCase.BATCH_OP,
+                        ManagedMemoryUseCase.OPERATOR,
                         tmConfig,
                         ClassLoader.getSystemClassLoader()),
                 delta);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
index a017266..9e7b158 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/ExecNodeUtil.java
@@ -60,7 +60,7 @@ public class ExecNodeUtil {
             int memoryKibiBytes = (int) Math.max(1, (memoryBytes >> 10));
             Optional<Integer> previousWeight =
                     transformation.declareManagedMemoryUseCaseAtOperatorScope(
-                            ManagedMemoryUseCase.BATCH_OP, memoryKibiBytes);
+                            ManagedMemoryUseCase.OPERATOR, memoryKibiBytes);
             if (previousWeight.isPresent()) {
                 throw new TableException(
                         "Managed memory weight has been set, this should not 
happen.");
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
index 57f0b0b..d71a2a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
@@ -75,7 +75,7 @@ abstract class BatchAggTestBase extends 
AggTestBase(isBatchMode = true) {
     val streamConfig = testHarness.getStreamConfig
     streamConfig.setStreamOperatorFactory(args._1)
     streamConfig.setOperatorID(new OperatorID)
-    
streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP,
 .99)
+    
streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR,
 .99)
 
     testHarness.invoke()
     testHarness.waitForTaskRunning()
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
index eac0cf4..5c92161 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
@@ -54,7 +54,7 @@ public class TableStreamOperator<OUT> extends 
AbstractStreamOperator<OUT> {
                 .computeMemorySize(
                         getOperatorConfig()
                                 .getManagedMemoryFractionOperatorUseCaseOfSlot(
-                                        ManagedMemoryUseCase.BATCH_OP,
+                                        ManagedMemoryUseCase.OPERATOR,
                                         
environment.getTaskManagerInfo().getConfiguration(),
                                         
environment.getUserCodeClassLoader().asClassLoader()));
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
index e33ddb1b..b9759b8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
@@ -73,12 +73,12 @@ public class BatchMultipleInputStreamOperator extends 
MultipleInputStreamOperato
                 multipleInputOperatorParameters
                                 .getStreamConfig()
                                 .getManagedMemoryFractionOperatorUseCaseOfSlot(
-                                        ManagedMemoryUseCase.BATCH_OP,
+                                        ManagedMemoryUseCase.OPERATOR,
                                         taskManagerConfig,
                                         
getRuntimeContext().getUserCodeClassLoader())
                         * wrapper.getManagedMemoryFraction();
         streamConfig.setManagedMemoryFractionOperatorOfUseCase(
-                ManagedMemoryUseCase.BATCH_OP, managedMemoryFraction);
+                ManagedMemoryUseCase.OPERATOR, managedMemoryFraction);
         return streamConfig;
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
index 895c0f0..a5755dc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
@@ -175,14 +175,14 @@ public class TableOperatorWrapperGenerator {
             managedMemoryWeight =
                     transform
                             .getManagedMemoryOperatorScopeUseCaseWeights()
-                            .getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+                            .getOrDefault(ManagedMemoryUseCase.OPERATOR, 0);
         } else {
             minResources = minResources.merge(transform.getMinResources());
             preferredResources = 
preferredResources.merge(transform.getPreferredResources());
             managedMemoryWeight +=
                     transform
                             .getManagedMemoryOperatorScopeUseCaseWeights()
-                            .getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+                            .getOrDefault(ManagedMemoryUseCase.OPERATOR, 0);
         }
     }
 
@@ -310,7 +310,7 @@ public class TableOperatorWrapperGenerator {
                 fraction =
                         entry.getKey()
                                         
.getManagedMemoryOperatorScopeUseCaseWeights()
-                                        
.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0)
+                                        
.getOrDefault(ManagedMemoryUseCase.OPERATOR, 0)
                                 * 1.0
                                 / this.managedMemoryWeight;
             }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java
index e429f2f..b2e95b3 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/Int2HashJoinOperatorTest.java
@@ -348,7 +348,7 @@ public class Int2HashJoinOperatorTest implements 
Serializable {
         testHarness.getStreamConfig().setOperatorID(new OperatorID());
         testHarness
                 .getStreamConfig()
-                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99);
+                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
 
         testHarness.invoke();
         testHarness.waitForTaskRunning();
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
index 1f006d3..99f93fd 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java
@@ -290,7 +290,7 @@ public class RandomSortMergeInnerJoinTest {
         testHarness.getStreamConfig().setOperatorID(new OperatorID());
         testHarness
                 .getStreamConfig()
-                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99);
+                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
 
         long initialTime = 0L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
index 53fa65b..88c06bc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java
@@ -101,7 +101,7 @@ public class String2HashJoinOperatorTest implements 
Serializable {
         testHarness.getStreamConfig().setOperatorID(new OperatorID());
         testHarness
                 .getStreamConfig()
-                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99);
+                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
 
         testHarness.invoke();
         testHarness.waitForTaskRunning();
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
index 629284c..6a6c506 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java
@@ -167,7 +167,7 @@ public class String2SortMergeJoinOperatorTest {
         testHarness.getStreamConfig().setOperatorID(new OperatorID());
         testHarness
                 .getStreamConfig()
-                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.BATCH_OP, 0.99);
+                
.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
 
         long initialTime = 0L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java
index 1266e0b..6776f08 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.java
@@ -69,13 +69,13 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         source1,
                         "agg1",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
1);
+        
agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
1);
         OneInputTransformation<RowData, RowData> agg2 =
                 createOneInputTransform(
                         source2,
                         "agg2",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
2);
+        
agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
2);
         TwoInputTransformation<RowData, RowData, RowData> join =
                 createTwoInputTransform(
                         agg1,
@@ -85,7 +85,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                                 RowType.of(
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType())));
-        
join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
3);
+        
join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
3);
 
         TableOperatorWrapperGenerator generator =
                 new TableOperatorWrapperGenerator(
@@ -141,14 +141,14 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         source1,
                         "agg1",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
1);
+        
agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
1);
 
         OneInputTransformation<RowData, RowData> agg2 =
                 createOneInputTransform(
                         source2,
                         "agg2",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
2);
+        
agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
2);
 
         TwoInputTransformation<RowData, RowData, RowData> join1 =
                 createTwoInputTransform(
@@ -159,7 +159,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                                 RowType.of(
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType())));
-        
join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
3);
+        
join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
3);
 
         TwoInputTransformation<RowData, RowData, RowData> join2 =
                 createTwoInputTransform(
@@ -171,7 +171,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType())));
-        
join2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
4);
+        
join2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
4);
 
         TwoInputTransformation<RowData, RowData, RowData> join3 =
                 createTwoInputTransform(
@@ -182,7 +182,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                                 RowType.of(
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType())));
-        
join3.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
5);
+        
join3.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
5);
 
         TwoInputTransformation<RowData, RowData, RowData> join4 =
                 createTwoInputTransform(
@@ -196,7 +196,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType(),
                                         DataTypes.STRING().getLogicalType())));
-        
join4.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
6);
+        
join4.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
6);
 
         TableOperatorWrapperGenerator generator =
                 new TableOperatorWrapperGenerator(
@@ -269,7 +269,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         source4,
                         "agg1",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
1);
+        
agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
1);
 
         TwoInputTransformation<RowData, RowData, RowData> join1 =
                 createTwoInputTransform(
@@ -277,7 +277,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         union2,
                         "join1",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
2);
+        
join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
2);
 
         UnionTransformation<RowData> union3 = 
createUnionInputTransform("union3", source5, join1);
 
@@ -343,7 +343,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         source1,
                         "calc1",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
calc1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
1);
+        
calc1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
1);
         calc1.setParallelism(100);
 
         OneInputTransformation<RowData, RowData> calc2 =
@@ -351,7 +351,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         source2,
                         "calc2",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
calc2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
1);
+        
calc2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
1);
         calc2.setParallelism(50);
 
         UnionTransformation<RowData> union = 
createUnionInputTransform("union1", calc1, calc2);
@@ -362,7 +362,7 @@ public class TableOperatorWrapperGeneratorTest extends 
MultipleInputTestBase {
                         source3,
                         "join1",
                         
InternalTypeInfo.of(RowType.of(DataTypes.STRING().getLogicalType())));
-        
join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.BATCH_OP, 
1);
+        
join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 
1);
         join.setParallelism(200);
 
         TableOperatorWrapperGenerator generator =
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java
index 56bafae..e0dd975 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperatorTest.java
@@ -234,7 +234,7 @@ public class BufferDataOverWindowOperatorTest {
                         
when(conf.<RowData>getTypeSerializerIn1(getUserCodeClassloader()))
                                 .thenReturn(inputSer);
                         
when(conf.getManagedMemoryFractionOperatorUseCaseOfSlot(
-                                        eq(ManagedMemoryUseCase.BATCH_OP),
+                                        eq(ManagedMemoryUseCase.OPERATOR),
                                         any(Configuration.class),
                                         any(ClassLoader.class)))
                                 .thenReturn(0.99);

Reply via email to