This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e38cd1dc [core] Use Flink Managed Memory for RocksDB in cross 
partition update (#2366)
5e38cd1dc is described below

commit 5e38cd1dc1b00f2fa2c134a3331162f93b541628
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 22 22:36:54 2023 +0800

    [core] Use Flink Managed Memory for RocksDB in cross partition update 
(#2366)
---
 .../generated/flink_connector_configuration.html   |  6 +++
 .../paimon/crosspartition/GlobalIndexAssigner.java | 11 ++++-
 .../crosspartition/GlobalIndexAssignerTest.java    |  9 ++--
 .../apache/paimon/flink/FlinkConnectorOptions.java |  8 ++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  7 +---
 .../flink/sink/MultiTablesCompactorSink.java       |  8 +---
 .../paimon/flink/sink/PrepareCommitOperator.java   | 20 ++-------
 .../flink/sink/index/GlobalDynamicBucketSink.java  | 10 ++++-
 .../sink/index/GlobalIndexAssignerOperator.java    |  3 ++
 .../paimon/flink/utils/ManagedMemoryUtils.java     | 49 ++++++++++++++++++++++
 10 files changed, 98 insertions(+), 33 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 87262763b..dcd934223 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -128,6 +128,12 @@ under the License.
             <td>MemorySize</td>
             <td>Sink committer memory to control heap memory of global 
committer.</td>
         </tr>
+        <tr>
+            <td><h5>sink.cross-partition.managed-memory</h5></td>
+            <td style="word-wrap: break-word;">256 mb</td>
+            <td>MemorySize</td>
+            <td>Weight of managed memory for RocksDB in cross-partition 
update, Flink will compute the memory size according to the weight, the actual 
memory used depends on the running environment.</td>
+        </tr>
         <tr>
             <td><h5>sink.managed.writer-buffer-memory</h5></td>
             <td style="word-wrap: break-word;">256 mb</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 10821d87e..bcd83bc69 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -32,6 +32,7 @@ import org.apache.paimon.lookup.RocksDBOptions;
 import org.apache.paimon.lookup.RocksDBStateFactory;
 import org.apache.paimon.lookup.RocksDBValueState;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.table.AbstractFileStoreTable;
@@ -66,6 +67,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Assign UPDATE_BEFORE and bucket for the input record, output record with 
bucket. */
@@ -106,6 +108,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
     // ================== Start Public API ===================
 
     public void open(
+            long offHeapMemory,
             IOManager ioManager,
             int numAssigners,
             int assignId,
@@ -133,9 +136,15 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         
ThreadLocalRandom.current().nextInt(ioManager.tempDirs().length)];
         this.path = new File(rocksDBDir, "rocksdb-" + UUID.randomUUID());
 
+        Options rocksdbOptions = Options.fromMap(new 
HashMap<>(options.toMap()));
+        // we should avoid too small memory
+        long blockCache = Math.max(offHeapMemory, 
rocksdbOptions.get(BLOCK_CACHE_SIZE).getBytes());
+        rocksdbOptions.set(BLOCK_CACHE_SIZE, new MemorySize(blockCache));
         this.stateFactory =
                 new RocksDBStateFactory(
-                        path.toString(), options, 
coreOptions.crossPartitionUpsertIndexTtl());
+                        path.toString(),
+                        rocksdbOptions,
+                        coreOptions.crossPartitionUpsertIndexTtl());
         RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
         this.keyIndex =
                 stateFactory.valueState(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
index 7a4ae602b..347ede6e4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
@@ -93,7 +93,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
     private void innerTestBucketAssign(boolean enableTtl) throws Exception {
         GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE, 
enableTtl);
         List<Integer> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(bucket));
+        assigner.open(0, ioManager(), 2, 0, (row, bucket) -> 
output.add(bucket));
         assigner.endBoostrap(false);
 
         // assign
@@ -127,7 +127,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
     public void testUpsert() throws Exception {
         GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE);
         List<Pair<InternalRow, Integer>> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
+        assigner.open(0, ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
         assigner.endBoostrap(false);
 
         // change partition
@@ -171,7 +171,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
                         : MergeEngine.AGGREGATE;
         GlobalIndexAssigner assigner = createAssigner(mergeEngine);
         List<Pair<InternalRow, Integer>> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
+        assigner.open(0, ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
         assigner.endBoostrap(false);
 
         // change partition
@@ -195,7 +195,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
     public void testFirstRow() throws Exception {
         GlobalIndexAssigner assigner = createAssigner(MergeEngine.FIRST_ROW);
         List<Pair<InternalRow, Integer>> output = new ArrayList<>();
-        assigner.open(ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
+        assigner.open(0, ioManager(), 2, 0, (row, bucket) -> 
output.add(Pair.of(row, bucket)));
         assigner.endBoostrap(false);
 
         // change partition
@@ -218,6 +218,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
         GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE);
         List<List<Integer>> output = new ArrayList<>();
         assigner.open(
+                0,
                 ioManager(),
                 2,
                 0,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index d612a003e..2e3bec814 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -237,6 +237,14 @@ public class FlinkConnectorOptions {
                             "Weight of writer buffer in managed memory, Flink 
will compute the memory size "
                                     + "for writer according to the weight, the 
actual memory used depends on the running environment.");
 
+    public static final ConfigOption<MemorySize> 
SINK_CROSS_PARTITION_MANAGED_MEMORY =
+            ConfigOptions.key("sink.cross-partition.managed-memory")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(256))
+                    .withDescription(
+                            "Weight of managed memory for RocksDB in 
cross-partition update, Flink will compute the memory size "
+                                    + "according to the weight, the actual 
memory used depends on the running environment.");
+
     public static final ConfigOption<Boolean> SCAN_PUSH_DOWN =
             ConfigOptions.key("scan.push-down")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 01d77bb8b..c9a6f3c05 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -54,6 +53,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Abstract sink of paimon. */
@@ -172,10 +172,7 @@ public abstract class FlinkSink<T> implements Serializable 
{
 
         Options options = Options.fromMap(table.options());
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
-            MemorySize memorySize = 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
-            written.getTransformation()
-                    .declareManagedMemoryUseCaseAtOperatorScope(
-                            ManagedMemoryUseCase.OPERATOR, 
memorySize.getMebiBytes());
+            declareManagedMemory(written, 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
         }
         return written;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index 8b6ad8891..d9ded153d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -22,13 +22,11 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -45,6 +43,7 @@ import java.util.UUID;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** A sink for processing multi-tables in dedicated compaction job. */
@@ -105,10 +104,7 @@ public class MultiTablesCompactorSink implements 
Serializable {
         }
 
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
-            MemorySize memorySize = 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
-            written.getTransformation()
-                    .declareManagedMemoryUseCaseAtOperatorScope(
-                            ManagedMemoryUseCase.OPERATOR, 
memorySize.getMebiBytes());
+            declareManagedMemory(written, 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
         }
         return written;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
index d4397309b..7dbab7f2d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
@@ -23,8 +23,6 @@ import org.apache.paimon.flink.memory.MemorySegmentAllocator;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.Options;
 
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -41,6 +39,7 @@ import java.io.IOException;
 import java.util.List;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.computeManagedMemory;
 
 /** Prepare commit operator to emit {@link Committable}s. */
 public abstract class PrepareCommitOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
@@ -67,23 +66,12 @@ public abstract class PrepareCommitOperator<IN, OUT> 
extends AbstractStreamOpera
             memoryAllocator = new MemorySegmentAllocator(containingTask, 
memoryManager);
             memoryPool =
                     new FlinkMemorySegmentPool(
-                            computeMemorySize(), memoryManager.getPageSize(), 
memoryAllocator);
+                            computeManagedMemory(this),
+                            memoryManager.getPageSize(),
+                            memoryAllocator);
         }
     }
 
-    /** Compute memory size from memory faction. */
-    private long computeMemorySize() {
-        final Environment environment = getContainingTask().getEnvironment();
-        return environment
-                .getMemoryManager()
-                .computeMemorySize(
-                        getOperatorConfig()
-                                .getManagedMemoryFractionOperatorUseCaseOfSlot(
-                                        ManagedMemoryUseCase.OPERATOR,
-                                        
environment.getTaskManagerInfo().getConfiguration(),
-                                        
environment.getUserCodeClassLoader().asClassLoader()));
-    }
-
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
         if (!endOfInput) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 589f86a86..99b78936b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.index;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.crosspartition.IndexBootstrap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.Committable;
@@ -45,7 +46,9 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.paimon.crosspartition.IndexBootstrap.bootstrapType;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_CROSS_PARTITION_MANAGED_MEMORY;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
 
 /** Sink for global dynamic bucket table. */
 public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow, Integer>> {
@@ -67,6 +70,7 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow,
         String initialCommitUser = UUID.randomUUID().toString();
 
         TableSchema schema = table.schema();
+        CoreOptions options = table.coreOptions();
         RowType rowType = schema.logicalRowType();
         List<String> primaryKeys = schema.primaryKeys();
         InternalRowTypeSerializer rowSerializer = new 
InternalRowTypeSerializer(rowType);
@@ -89,7 +93,7 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow,
                         .setParallelism(input.getParallelism());
 
         // 1. shuffle by key hash
-        Integer assignerParallelism = 
table.coreOptions().dynamicBucketAssignerParallelism();
+        Integer assignerParallelism = 
options.dynamicBucketAssignerParallelism();
         if (assignerParallelism == null) {
             assignerParallelism = parallelism;
         }
@@ -110,6 +114,10 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow,
                                 GlobalIndexAssignerOperator.forRowData(table))
                         .setParallelism(partitionByKeyHash.getParallelism());
 
+        // declare managed memory for RocksDB
+        declareManagedMemory(
+                bucketAssigned, 
options.toConfiguration().get(SINK_CROSS_PARTITION_MANAGED_MEMORY));
+
         // 3. shuffle by bucket
 
         DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 4cb56435c..bec047762 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.computeManagedMemory;
+
 /** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
 public class GlobalIndexAssignerOperator
         extends AbstractStreamOperator<Tuple2<InternalRow, Integer>>
@@ -54,6 +56,7 @@ public class GlobalIndexAssignerOperator
                 getContainingTask().getEnvironment().getIOManager();
         ioManager = 
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
         assigner.open(
+                computeManagedMemory(this),
                 ioManager,
                 getRuntimeContext().getNumberOfParallelSubtasks(),
                 getRuntimeContext().getIndexOfThisSubtask(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
new file mode 100644
index 000000000..a51d9e02e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.options.MemorySize;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/** Utils for using Flink managed memory. */
+public class ManagedMemoryUtils {
+
+    public static void declareManagedMemory(DataStream<?> dataStream, 
MemorySize memorySize) {
+        dataStream
+                .getTransformation()
+                .declareManagedMemoryUseCaseAtOperatorScope(
+                        ManagedMemoryUseCase.OPERATOR, 
memorySize.getMebiBytes());
+    }
+
+    public static long computeManagedMemory(AbstractStreamOperator<?> 
operator) {
+        final Environment environment = 
operator.getContainingTask().getEnvironment();
+        return environment
+                .getMemoryManager()
+                .computeMemorySize(
+                        operator.getOperatorConfig()
+                                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                                        ManagedMemoryUseCase.OPERATOR,
+                                        
environment.getTaskManagerInfo().getConfiguration(),
+                                        
environment.getUserCodeClassLoader().asClassLoader()));
+    }
+}

Reply via email to