This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5e38cd1dc [core] Use Flink Managed Memory for RocksDB in cross
partition update (#2366)
5e38cd1dc is described below
commit 5e38cd1dc1b00f2fa2c134a3331162f93b541628
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 22 22:36:54 2023 +0800
[core] Use Flink Managed Memory for RocksDB in cross partition update
(#2366)
---
.../generated/flink_connector_configuration.html | 6 +++
.../paimon/crosspartition/GlobalIndexAssigner.java | 11 ++++-
.../crosspartition/GlobalIndexAssignerTest.java | 9 ++--
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 ++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 7 +---
.../flink/sink/MultiTablesCompactorSink.java | 8 +---
.../paimon/flink/sink/PrepareCommitOperator.java | 20 ++-------
.../flink/sink/index/GlobalDynamicBucketSink.java | 10 ++++-
.../sink/index/GlobalIndexAssignerOperator.java | 3 ++
.../paimon/flink/utils/ManagedMemoryUtils.java | 49 ++++++++++++++++++++++
10 files changed, 98 insertions(+), 33 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 87262763b..dcd934223 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -128,6 +128,12 @@ under the License.
<td>MemorySize</td>
<td>Sink committer memory to control heap memory of global
committer.</td>
</tr>
+ <tr>
+ <td><h5>sink.cross-partition.managed-memory</h5></td>
+ <td style="word-wrap: break-word;">256 mb</td>
+ <td>MemorySize</td>
+ <td>Weight of managed memory for RocksDB in cross-partition
update, Flink will compute the memory size according to the weight, the actual
memory used depends on the running environment.</td>
+ </tr>
<tr>
<td><h5>sink.managed.writer-buffer-memory</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 10821d87e..bcd83bc69 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -32,6 +32,7 @@ import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.lookup.RocksDBValueState;
import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.AbstractFileStoreTable;
@@ -66,6 +67,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Assign UPDATE_BEFORE and bucket for the input record, output record with
bucket. */
@@ -106,6 +108,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
// ================== Start Public API ===================
public void open(
+ long offHeapMemory,
IOManager ioManager,
int numAssigners,
int assignId,
@@ -133,9 +136,15 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
ThreadLocalRandom.current().nextInt(ioManager.tempDirs().length)];
this.path = new File(rocksDBDir, "rocksdb-" + UUID.randomUUID());
+ Options rocksdbOptions = Options.fromMap(new
HashMap<>(options.toMap()));
+ // we should avoid too small memory
+ long blockCache = Math.max(offHeapMemory,
rocksdbOptions.get(BLOCK_CACHE_SIZE).getBytes());
+ rocksdbOptions.set(BLOCK_CACHE_SIZE, new MemorySize(blockCache));
this.stateFactory =
new RocksDBStateFactory(
- path.toString(), options,
coreOptions.crossPartitionUpsertIndexTtl());
+ path.toString(),
+ rocksdbOptions,
+ coreOptions.crossPartitionUpsertIndexTtl());
RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
this.keyIndex =
stateFactory.valueState(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
index 7a4ae602b..347ede6e4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
@@ -93,7 +93,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
private void innerTestBucketAssign(boolean enableTtl) throws Exception {
GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE,
enableTtl);
List<Integer> output = new ArrayList<>();
- assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(bucket));
+ assigner.open(0, ioManager(), 2, 0, (row, bucket) ->
output.add(bucket));
assigner.endBoostrap(false);
// assign
@@ -127,7 +127,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
public void testUpsert() throws Exception {
GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE);
List<Pair<InternalRow, Integer>> output = new ArrayList<>();
- assigner.open(ioManager(), 2, 0, (row, bucket) ->
output.add(Pair.of(row, bucket)));
+ assigner.open(0, ioManager(), 2, 0, (row, bucket) ->
output.add(Pair.of(row, bucket)));
assigner.endBoostrap(false);
// change partition
@@ -171,7 +171,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
: MergeEngine.AGGREGATE;
GlobalIndexAssigner assigner = createAssigner(mergeEngine);
List<Pair<InternalRow, Integer>> output = new ArrayList<>();
- assigner.open(ioManager(), 2, 0, (row, bucket) ->
output.add(Pair.of(row, bucket)));
+ assigner.open(0, ioManager(), 2, 0, (row, bucket) ->
output.add(Pair.of(row, bucket)));
assigner.endBoostrap(false);
// change partition
@@ -195,7 +195,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
public void testFirstRow() throws Exception {
GlobalIndexAssigner assigner = createAssigner(MergeEngine.FIRST_ROW);
List<Pair<InternalRow, Integer>> output = new ArrayList<>();
- assigner.open(ioManager(), 2, 0, (row, bucket) ->
output.add(Pair.of(row, bucket)));
+ assigner.open(0, ioManager(), 2, 0, (row, bucket) ->
output.add(Pair.of(row, bucket)));
assigner.endBoostrap(false);
// change partition
@@ -218,6 +218,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE);
List<List<Integer>> output = new ArrayList<>();
assigner.open(
+ 0,
ioManager(),
2,
0,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index d612a003e..2e3bec814 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -237,6 +237,14 @@ public class FlinkConnectorOptions {
"Weight of writer buffer in managed memory, Flink
will compute the memory size "
+ "for writer according to the weight, the
actual memory used depends on the running environment.");
+ public static final ConfigOption<MemorySize>
SINK_CROSS_PARTITION_MANAGED_MEMORY =
+ ConfigOptions.key("sink.cross-partition.managed-memory")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(256))
+ .withDescription(
+ "Weight of managed memory for RocksDB in
cross-partition update, Flink will compute the memory size "
+ + "according to the weight, the actual
memory used depends on the running environment.");
+
public static final ConfigOption<Boolean> SCAN_PUSH_DOWN =
ConfigOptions.key("scan.push-down")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 01d77bb8b..c9a6f3c05 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -54,6 +53,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Abstract sink of paimon. */
@@ -172,10 +172,7 @@ public abstract class FlinkSink<T> implements Serializable
{
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
- MemorySize memorySize =
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
- written.getTransformation()
- .declareManagedMemoryUseCaseAtOperatorScope(
- ManagedMemoryUseCase.OPERATOR,
memorySize.getMebiBytes());
+ declareManagedMemory(written,
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
return written;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index 8b6ad8891..d9ded153d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -22,13 +22,11 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -45,6 +43,7 @@ import java.util.UUID;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** A sink for processing multi-tables in dedicated compaction job. */
@@ -105,10 +104,7 @@ public class MultiTablesCompactorSink implements
Serializable {
}
if (options.get(SINK_USE_MANAGED_MEMORY)) {
- MemorySize memorySize =
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
- written.getTransformation()
- .declareManagedMemoryUseCaseAtOperatorScope(
- ManagedMemoryUseCase.OPERATOR,
memorySize.getMebiBytes());
+ declareManagedMemory(written,
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
return written;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
index d4397309b..7dbab7f2d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
@@ -23,8 +23,6 @@ import org.apache.paimon.flink.memory.MemorySegmentAllocator;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.Options;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
-import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -41,6 +39,7 @@ import java.io.IOException;
import java.util.List;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.computeManagedMemory;
/** Prepare commit operator to emit {@link Committable}s. */
public abstract class PrepareCommitOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
@@ -67,23 +66,12 @@ public abstract class PrepareCommitOperator<IN, OUT>
extends AbstractStreamOpera
memoryAllocator = new MemorySegmentAllocator(containingTask,
memoryManager);
memoryPool =
new FlinkMemorySegmentPool(
- computeMemorySize(), memoryManager.getPageSize(),
memoryAllocator);
+ computeManagedMemory(this),
+ memoryManager.getPageSize(),
+ memoryAllocator);
}
}
- /** Compute memory size from memory faction. */
- private long computeMemorySize() {
- final Environment environment = getContainingTask().getEnvironment();
- return environment
- .getMemoryManager()
- .computeMemorySize(
- getOperatorConfig()
- .getManagedMemoryFractionOperatorUseCaseOfSlot(
- ManagedMemoryUseCase.OPERATOR,
-
environment.getTaskManagerInfo().getConfiguration(),
-
environment.getUserCodeClassLoader().asClassLoader()));
- }
-
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
if (!endOfInput) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 589f86a86..99b78936b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.index;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
@@ -45,7 +46,9 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.paimon.crosspartition.IndexBootstrap.bootstrapType;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_CROSS_PARTITION_MANAGED_MEMORY;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
/** Sink for global dynamic bucket table. */
public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow, Integer>> {
@@ -67,6 +70,7 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow,
String initialCommitUser = UUID.randomUUID().toString();
TableSchema schema = table.schema();
+ CoreOptions options = table.coreOptions();
RowType rowType = schema.logicalRowType();
List<String> primaryKeys = schema.primaryKeys();
InternalRowTypeSerializer rowSerializer = new
InternalRowTypeSerializer(rowType);
@@ -89,7 +93,7 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow,
.setParallelism(input.getParallelism());
// 1. shuffle by key hash
- Integer assignerParallelism =
table.coreOptions().dynamicBucketAssignerParallelism();
+ Integer assignerParallelism =
options.dynamicBucketAssignerParallelism();
if (assignerParallelism == null) {
assignerParallelism = parallelism;
}
@@ -110,6 +114,10 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow,
GlobalIndexAssignerOperator.forRowData(table))
.setParallelism(partitionByKeyHash.getParallelism());
+ // declare managed memory for RocksDB
+ declareManagedMemory(
+ bucketAssigned,
options.toConfiguration().get(SINK_CROSS_PARTITION_MANAGED_MEMORY));
+
// 3. shuffle by bucket
DataStream<Tuple2<InternalRow, Integer>> partitionByBucket =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 4cb56435c..bec047762 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -30,6 +30,8 @@ import
org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import static
org.apache.paimon.flink.utils.ManagedMemoryUtils.computeManagedMemory;
+
/** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
public class GlobalIndexAssignerOperator
extends AbstractStreamOperator<Tuple2<InternalRow, Integer>>
@@ -54,6 +56,7 @@ public class GlobalIndexAssignerOperator
getContainingTask().getEnvironment().getIOManager();
ioManager =
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
assigner.open(
+ computeManagedMemory(this),
ioManager,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
new file mode 100644
index 000000000..a51d9e02e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.options.MemorySize;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/** Utils for using Flink managed memory. */
+public class ManagedMemoryUtils {
+
+ public static void declareManagedMemory(DataStream<?> dataStream,
MemorySize memorySize) {
+ dataStream
+ .getTransformation()
+ .declareManagedMemoryUseCaseAtOperatorScope(
+ ManagedMemoryUseCase.OPERATOR,
memorySize.getMebiBytes());
+ }
+
+ public static long computeManagedMemory(AbstractStreamOperator<?>
operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+
environment.getTaskManagerInfo().getConfiguration(),
+
environment.getUserCodeClassLoader().asClassLoader()));
+ }
+}