This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3e8318a2c [flink] Introduce Dynamic bucket sink to Flink (#1357)
3e8318a2c is described below
commit 3e8318a2c6da4521e4354315b1e8ff6ac5413585
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 13 10:00:38 2023 +0800
[flink] Introduce Dynamic bucket sink to Flink (#1357)
---
docs/content/concepts/primary-key-table.md | 11 +++
docs/content/how-to/cdc-ingestion.md | 2 +-
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 12 +++
.../flink/sink/DynamicBucketRowWriteOperator.java | 52 ++++++++++++
.../paimon/flink/sink/DynamicBucketSink.java | 86 ++++++++++++++++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 39 ++++-----
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 8 ++
.../flink/sink/HashBucketAssignerOperator.java | 92 ++++++++++++++++++++++
.../paimon/flink/sink/RowDynamicBucketSink.java | 68 ++++++++++++++++
.../paimon/flink/sink/TableWriteOperator.java | 25 +++---
.../flink/sink/cdc/CdcDynamicBucketSink.java | 80 +++++++++++++++++++
...tor.java => CdcDynamicBucketWriteOperator.java} | 28 +++----
.../flink/sink/cdc/CdcHashKeyChannelComputer.java | 55 +++++++++++++
.../sink/cdc/CdcRecordKeyAndBucketExtractor.java | 15 +++-
.../sink/cdc/CdcRecordStoreWriteOperator.java | 2 +-
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 33 ++++----
.../sink/cdc/CdcWithBucketChannelComputer.java | 56 +++++++++++++
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 48 +++++------
.../paimon/flink/DynamicBucketTableITCase.java | 68 ++++++++++++++++
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 15 +++-
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 12 ++-
22 files changed, 719 insertions(+), 94 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index c1cd5f920..42bd87936 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -32,6 +32,17 @@ Primary keys consist of a set of columns that contain unique
values for each rec
By [defining primary keys]({{< ref
"how-to/creating-tables#tables-with-primary-keys" >}}) on a changelog table,
users can access the following features.
+## Bucket
+
+A bucket is the smallest storage unit for reads and writes, each bucket
directory contains an [LSM tree]({{< ref "concepts/file-layouts#lsm-trees" >}}).
+
+Primary Key Table supports two bucket mode:
+1. Fixed Bucket mode: configure a bucket greater than 0, rescaling buckets can
only be done through offline processes,
+ see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}). A too
large number of buckets leads to too many
+ small files, and a too small number of buckets leads to poor write
performance.
+2. Dynamic Bucket mode: configure `'bucket' = '-1'`, Paimon dynamically
maintains the index, keeping the data volume
+ in the bucket below `'dynamic-bucket.target-row-num'`. (This is an
experimental feature)
+
## Merge Engines
When Paimon sink receives two or more records with the same primary keys, it
will merge them into one record to keep primary keys unique. By specifying the
`merge-engine` table property, users can choose how records are merged together.
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index acd5edc1a..9f1039c76 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -176,7 +176,7 @@ The command to recover from previous snapshot and add new
tables to synchronize
```bash
<FLINK_HOME>/bin/flink run \
- --fromSavepoint {{< savepointPath >}} \
+ --fromSavepoint savepointPath \
/path/to/paimon-flink-action-{{< version >}}.jar \
mysql-sync-database \
--warehouse hdfs:///path/to/warehouse \
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 872a8da15..75debc1ac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@ under the License.
<td>Duration</td>
<td>The discovery interval of continuous reading.</td>
</tr>
+ <tr>
+ <td><h5>dynamic-bucket.target-row-num</h5></td>
+ <td style="word-wrap: break-word;">2000000</td>
+ <td>Long</td>
+ <td>If the bucket is -1, for primary key table, is dynamic bucket
mode, this option controls the target row number for one bucket.</td>
+ </tr>
<tr>
<td><h5>dynamic-partition-overwrite</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index d38150de0..0d195ac39 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -648,6 +648,14 @@ public class CoreOptions implements Serializable {
"The expiration interval of consumer files. A
consumer file will be expired if "
+ "it's lifetime after last modification
is over this value.");
+ public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
+ key("dynamic-bucket.target-row-num")
+ .longType()
+ .defaultValue(2_000_000L)
+ .withDescription(
+ "If the bucket is -1, for primary key table, is
dynamic bucket mode, "
+ + "this option controls the target row
number for one bucket.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -837,6 +845,10 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
}
+ public long dynamicBucketTargetRowNum() {
+ return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
+ }
+
public ChangelogProducer changelogProducer() {
return options.get(CHANGELOG_PRODUCER);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
new file mode 100644
index 000000000..7db9c2802
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/** A {@link PrepareCommitOperator} to write {@link RowData} with bucket.
Record schema is fixed. */
+public class DynamicBucketRowWriteOperator extends
TableWriteOperator<Tuple2<RowData, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DynamicBucketRowWriteOperator(
+ FileStoreTable table,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser) {
+ super(table, storeSinkWriteProvider, initialCommitUser);
+ }
+
+ @Override
+ protected boolean containLogSystem() {
+ return false;
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<RowData, Integer>> element)
throws Exception {
+ FlinkRowWrapper internalRow = new
FlinkRowWrapper(element.getValue().f0);
+ DynamicBucketRow row = new DynamicBucketRow(internalRow,
element.getValue().f1);
+ write.write(row);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
new file mode 100644
index 000000000..e4f804832
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.paimon.flink.sink.FlinkStreamPartitioner.createPartitionTransformation;
+
+/** Sink for dynamic bucket table. */
+public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T,
Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DynamicBucketSink(
+ FileStoreTable table,
+ Lock.Factory lockFactory,
+ @Nullable Map<String, String> overwritePartition) {
+ super(table, overwritePartition, lockFactory);
+ }
+
+ protected abstract ChannelComputer<T> channelComputer1();
+
+ protected abstract ChannelComputer<Tuple2<T, Integer>> channelComputer2();
+
+ protected abstract SerializableFunction<TableSchema,
PartitionKeyExtractor<T>>
+ extractorFunction();
+
+ public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer
parallelism) {
+ String initialCommitUser = UUID.randomUUID().toString();
+
+ // Topology:
+ // input -- shuffle by key hash --> bucket-assigner -- shuffle by
bucket --> writer -->
+ // committer
+
+ // 1. shuffle by key hash
+ DataStream<T> partitionByKeyHash =
+ createPartitionTransformation(input, channelComputer1(),
parallelism);
+
+ // 2. bucket-assigner
+ HashBucketAssignerOperator<T> assignerOperator =
+ new HashBucketAssignerOperator<>(initialCommitUser, table,
extractorFunction());
+ TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
+ new TupleTypeInfo<>(partitionByKeyHash.getType(),
BasicTypeInfo.INT_TYPE_INFO);
+ DataStream<Tuple2<T, Integer>> bucketAssigned =
+ partitionByKeyHash.transform(
+ "dynamic-bucket-assigner", rowWithBucketType,
assignerOperator);
+
+ // 3. shuffle by bucket
+ DataStream<Tuple2<T, Integer>> partitionByBucket =
+ createPartitionTransformation(bucketAssigned,
channelComputer2(), parallelism);
+
+ // 4. writer and committer
+ return sinkFrom(partitionByBucket, initialCommitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 6ab2ddd4c..a88523219 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -125,7 +125,16 @@ public abstract class FlinkSink<T> implements Serializable
{
return sinkFrom(input, initialCommitUser);
}
- public DataStreamSink<?> sinkFrom(DataStream<T> input, String commitUser) {
+ public DataStreamSink<?> sinkFrom(DataStream<T> input, String
initialCommitUser) {
+
+ return sinkFrom(
+ input,
+ initialCommitUser,
+
createWriteProvider(input.getExecutionEnvironment().getCheckpointConfig()));
+ }
+
+ public DataStreamSink<?> sinkFrom(
+ DataStream<T> input, String commitUser, StoreSinkWrite.Provider
sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -138,24 +147,12 @@ public abstract class FlinkSink<T> implements
Serializable {
assertCheckpointConfiguration(env);
}
- SingleOutputStreamOperator<Committable> written =
- handleInput(input, isStreaming, commitUser);
-
- return commit(written, streamingCheckpointEnabled, commitUser);
- }
-
- protected SingleOutputStreamOperator<Committable> handleInput(
- DataStream<T> input, boolean isStreaming, String commitUser) {
+ CommittableTypeInfo typeInfo = new CommittableTypeInfo();
SingleOutputStreamOperator<Committable> written =
input.transform(
WRITER_NAME + " -> " + table.name(),
- new CommittableTypeInfo(),
- createWriteOperator(
- createWriteProvider(
- input.getExecutionEnvironment()
-
.getCheckpointConfig()),
- isStreaming,
- commitUser))
+ typeInfo,
+ createWriteOperator(sinkProvider, isStreaming,
commitUser))
.setParallelism(input.getParallelism());
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
@@ -164,17 +161,11 @@ public abstract class FlinkSink<T> implements
Serializable {
.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.OPERATOR,
memorySize.getMebiBytes());
}
- return written;
- }
- protected DataStreamSink<?> commit(
- SingleOutputStreamOperator<Committable> written,
- boolean streamingCheckpointEnabled,
- String commitUser) {
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " -> " + table.name(),
- new CommittableTypeInfo(),
+ typeInfo,
new CommitterOperator<>(
streamingCheckpointEnabled,
commitUser,
@@ -185,7 +176,7 @@ public abstract class FlinkSink<T> implements Serializable {
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
- public static void
assertCheckpointConfiguration(StreamExecutionEnvironment env) {
+ private void assertCheckpointConfiguration(StreamExecutionEnvironment env)
{
Preconditions.checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
"Paimon sink currently does not support unaligned checkpoints.
Please set "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 84f031614..1aa48d805 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
import java.util.Map;
import static
org.apache.paimon.flink.sink.FlinkStreamPartitioner.createPartitionTransformation;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Builder for {@link FileStoreSink}. */
public class FlinkSinkBuilder {
@@ -78,12 +79,19 @@ public class FlinkSinkBuilder {
case FIXED:
return buildForFixedBucket();
case DYNAMIC:
+ return buildDynamicBucketSink();
case UNAWARE:
default:
throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
}
}
+ private DataStreamSink<?> buildDynamicBucketSink() {
+ checkArgument(logSinkFunction == null, "Dynamic bucket mode can not
work with log system.");
+ return new RowDynamicBucketSink(table, lockFactory, overwritePartition)
+ .build(input, parallelism);
+ }
+
private DataStreamSink<?> buildForFixedBucket() {
DataStream<RowData> partitioned =
createPartitionTransformation(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
new file mode 100644
index 000000000..2d256d9bf
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Assign bucket for the input record, output record with bucket. */
+public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2<T, Integer>>
+ implements OneInputStreamOperator<T, Tuple2<T, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String initialCommitUser;
+
+ private final AbstractFileStoreTable table;
+ private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
+
+ private transient HashBucketAssigner assigner;
+ private transient PartitionKeyExtractor<T> extractor;
+
+ public HashBucketAssignerOperator(
+ String commitUser,
+ Table table,
+ SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction) {
+ this.initialCommitUser = commitUser;
+ this.table = (AbstractFileStoreTable) table;
+ this.extractorFunction = extractorFunction;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+
+ // Each job can only have one user name and this name must be
consistent across restarts.
+ // We cannot use job id as commit user name here because user may
change job id by creating
+ // a savepoint, stop the job and then resume from savepoint.
+ String commitUser =
+ StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class,
initialCommitUser);
+
+ this.assigner =
+ new HashBucketAssigner(
+ table.snapshotManager(),
+ commitUser,
+ table.store().newIndexFileHandler(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ table.coreOptions().dynamicBucketTargetRowNum());
+ this.extractor = extractorFunction.apply(table.schema());
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> streamRecord) throws Exception {
+ T value = streamRecord.getValue();
+ int bucket =
+ assigner.assign(
+ extractor.partition(value),
extractor.trimmedPrimaryKey(value).hashCode());
+ output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) {
+ assigner.prepareCommit(checkpointId);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
new file mode 100644
index 000000000..b6678a5df
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Sink for dynamic bucket table. */
+public class RowDynamicBucketSink extends DynamicBucketSink<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ public RowDynamicBucketSink(
+ FileStoreTable table,
+ Lock.Factory lockFactory,
+ @Nullable Map<String, String> overwritePartition) {
+ super(table, lockFactory, overwritePartition);
+ }
+
+ @Override
+ protected ChannelComputer<RowData> channelComputer1() {
+ return new RowHashKeyChannelComputer(table.schema());
+ }
+
+ @Override
+ protected ChannelComputer<Tuple2<RowData, Integer>> channelComputer2() {
+ return new RowWithBucketChannelComputer(table.schema());
+ }
+
+ @Override
+ protected SerializableFunction<TableSchema, PartitionKeyExtractor<RowData>>
+ extractorFunction() {
+ return RowDataPartitionKeyExtractor::new;
+ }
+
+ @Override
+ protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ return new DynamicBucketRowWriteOperator(table, writeProvider,
commitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 1c7f53a5e..b7ed60f68 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -59,15 +60,17 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
- RowDataChannelComputer channelComputer =
- new RowDataChannelComputer(table.schema(), containLogSystem());
-
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
- state =
- new StoreSinkWriteState(
- context,
- (tableName, partition, bucket) ->
- channelComputer.channel(partition, bucket)
- ==
getRuntimeContext().getIndexOfThisSubtask());
+ boolean containLogSystem = containLogSystem();
+ int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ StateValueFilter stateFilter =
+ (tableName, partition, bucket) -> {
+ int task =
+ containLogSystem
+ ? ChannelComputer.select(bucket, numTasks)
+ : ChannelComputer.select(partition,
bucket, numTasks);
+ return task == getRuntimeContext().getIndexOfThisSubtask();
+ };
+ state = new StoreSinkWriteState(context, stateFilter);
write =
storeSinkWriteProvider.provide(
@@ -91,7 +94,9 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
@Override
public void close() throws Exception {
super.close();
- write.close();
+ if (write != null) {
+ write.close();
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
new file mode 100644
index 000000000..02cd8c105
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.DynamicBucketSink;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+/** Sink for dynamic bucket table. */
+public class CdcDynamicBucketSink extends DynamicBucketSink<CdcRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ public CdcDynamicBucketSink(FileStoreTable table, Lock.Factory
lockFactory) {
+ super(table, lockFactory, null);
+ }
+
+ @Override
+ protected ChannelComputer<CdcRecord> channelComputer1() {
+ return new CdcHashKeyChannelComputer(table.schema());
+ }
+
+ @Override
+ protected ChannelComputer<Tuple2<CdcRecord, Integer>> channelComputer2() {
+ return new CdcWithBucketChannelComputer(table.schema());
+ }
+
+ @Override
+ protected SerializableFunction<TableSchema,
PartitionKeyExtractor<CdcRecord>>
+ extractorFunction() {
+ return schema -> {
+ CdcRecordKeyAndBucketExtractor extractor = new
CdcRecordKeyAndBucketExtractor(schema);
+ return new PartitionKeyExtractor<CdcRecord>() {
+ @Override
+ public BinaryRow partition(CdcRecord record) {
+ extractor.setRecord(record);
+ return extractor.partition();
+ }
+
+ @Override
+ public BinaryRow trimmedPrimaryKey(CdcRecord record) {
+ extractor.setRecord(record);
+ return extractor.trimmedPrimaryKey();
+ }
+ };
+ };
+ }
+
+ @Override
+ protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ return new CdcDynamicBucketWriteOperator(table, writeProvider,
commitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
similarity index 73%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
index 04ae9ea38..64fb65910 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
@@ -22,35 +22,29 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.IOException;
-import java.time.Duration;
import java.util.Optional;
+import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
/**
- * A {@link PrepareCommitOperator} to write {@link CdcRecord}. Record schema
may change. If current
- * known schema does not fit record schema, this operator will wait for schema
changes.
+ * A {@link PrepareCommitOperator} to write {@link CdcRecord} with bucket.
Record schema is fixed.
*/
-public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord>
{
+public class CdcDynamicBucketWriteOperator extends
TableWriteOperator<Tuple2<CdcRecord, Integer>> {
private static final long serialVersionUID = 1L;
- static final ConfigOption<Duration> RETRY_SLEEP_TIME =
- ConfigOptions.key("cdc.retry-sleep-time")
- .durationType()
- .defaultValue(Duration.ofMillis(500));
-
private final long retrySleepMillis;
- public CdcRecordStoreWriteOperator(
+ public CdcDynamicBucketWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
@@ -71,13 +65,13 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
}
@Override
- public void processElement(StreamRecord<CdcRecord> element) throws
Exception {
- CdcRecord record = element.getValue();
- Optional<GenericRow> optionalConverted = toGenericRow(record,
table.schema().fields());
+ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>>
element) throws Exception {
+ Tuple2<CdcRecord, Integer> record = element.getValue();
+ Optional<GenericRow> optionalConverted = toGenericRow(record.f0,
table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
table = table.copyWithLatestSchema();
- optionalConverted = toGenericRow(record,
table.schema().fields());
+ optionalConverted = toGenericRow(record.f0,
table.schema().fields());
if (optionalConverted.isPresent()) {
break;
}
@@ -87,7 +81,7 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
}
try {
- write.write(optionalConverted.get());
+ write.write(new DynamicBucketRow(optionalConverted.get(),
record.f1));
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
new file mode 100644
index 000000000..918233749
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.schema.TableSchema;
+
+/** Hash key of a {@link CdcRecord}. */
+public class CdcHashKeyChannelComputer implements ChannelComputer<CdcRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+
+ private transient int numChannels;
+ private transient CdcRecordKeyAndBucketExtractor extractor;
+
+ public CdcHashKeyChannelComputer(TableSchema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+ }
+
+ @Override
+ public int channel(CdcRecord record) {
+ extractor.setRecord(record);
+ int hash = extractor.trimmedPrimaryKey().hashCode();
+ return Math.abs(hash % numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by key hash";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
index 9bd00f966..f3ab1654e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
@@ -41,10 +41,13 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
private final Projection partitionProjection;
private final List<DataField> bucketKeyFields;
private final Projection bucketKeyProjection;
+ private final List<DataField> trimmedPKFields;
+ private final Projection trimmedPKProjection;
private CdcRecord record;
private BinaryRow partition;
+ private BinaryRow trimmedPK;
private BinaryRow bucketKey;
private Integer bucket;
@@ -62,6 +65,12 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
this.bucketKeyProjection =
CodeGenUtils.newProjection(
bucketKeyType, IntStream.range(0,
bucketKeyType.getFieldCount()).toArray());
+
+ this.trimmedPKFields = schema.trimmedPrimaryKeysFields();
+ this.trimmedPKProjection =
+ CodeGenUtils.newProjection(
+ new RowType(trimmedPKFields),
+ IntStream.range(0, trimmedPKFields.size()).toArray());
}
@Override
@@ -70,6 +79,7 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
this.partition = null;
this.bucketKey = null;
+ this.trimmedPK = null;
this.bucket = null;
}
@@ -96,7 +106,10 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
@Override
public BinaryRow trimmedPrimaryKey() {
- throw new UnsupportedOperationException();
+ if (trimmedPK == null) {
+ trimmedPK = trimmedPKProjection.apply(projectAsInsert(record,
trimmedPKFields));
+ }
+ return trimmedPK;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
index 04ae9ea38..dd0aa2e56 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -43,7 +43,7 @@ public class CdcRecordStoreWriteOperator extends
TableWriteOperator<CdcRecord> {
private static final long serialVersionUID = 1L;
- static final ConfigOption<Duration> RETRY_SLEEP_TIME =
+ public static final ConfigOption<Duration> RETRY_SLEEP_TIME =
ConfigOptions.key("cdc.retry-sleep-time")
.durationType()
.defaultValue(Duration.ofMillis(500));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 8a35101f7..06051664e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -88,20 +88,6 @@ public class CdcSinkBuilder<T> {
FileStoreTable dataTable = (FileStoreTable) table;
- BucketMode bucketMode = dataTable.bucketMode();
- switch (bucketMode) {
- case FIXED:
- return buildForFixedBucket();
- case DYNAMIC:
- case UNAWARE:
- default:
- throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
- }
- }
-
- private DataStreamSink<?> buildForFixedBucket() {
- FileStoreTable dataTable = (FileStoreTable) table;
-
SingleOutputStreamOperator<CdcRecord> parsed =
input.forward()
.process(new
CdcParsingProcessFunction<>(parserFactory))
@@ -117,6 +103,25 @@ public class CdcSinkBuilder<T> {
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+ BucketMode bucketMode = dataTable.bucketMode();
+ switch (bucketMode) {
+ case FIXED:
+ return buildForFixedBucket(parsed);
+ case DYNAMIC:
+ return buildForDynamicBucket(parsed);
+ case UNAWARE:
+ default:
+ throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
+ }
+ }
+
+ private DataStreamSink<?> buildForDynamicBucket(DataStream<CdcRecord>
parsed) {
+ return new CdcDynamicBucketSink((FileStoreTable) table, lockFactory)
+ .build(parsed, parallelism);
+ }
+
+ private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord>
parsed) {
+ FileStoreTable dataTable = (FileStoreTable) table;
FlinkStreamPartitioner<CdcRecord> partitioner =
new FlinkStreamPartitioner<>(new
CdcRecordChannelComputer(dataTable.schema()));
PartitionTransformation<CdcRecord> partitioned =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
new file mode 100644
index 000000000..a24e6e3a0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.schema.TableSchema;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** Hash key of a {@link CdcRecord} with bucket. */
+public class CdcWithBucketChannelComputer implements
ChannelComputer<Tuple2<CdcRecord, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+
+ private transient int numChannels;
+ private transient CdcRecordKeyAndBucketExtractor extractor;
+
+ public CdcWithBucketChannelComputer(TableSchema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+ }
+
+ @Override
+ public int channel(Tuple2<CdcRecord, Integer> record) {
+ extractor.setRecord(record.f0);
+ return ChannelComputer.select(extractor.partition(), record.f1,
numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by partition & bucket";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index d01bb1696..dd3ca0ae7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import javax.annotation.Nullable;
@@ -84,20 +83,37 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
Preconditions.checkNotNull(input);
Preconditions.checkNotNull(parserFactory);
- StreamExecutionEnvironment env = input.getExecutionEnvironment();
-
SingleOutputStreamOperator<Void> parsed =
input.forward()
.process(new
CdcMultiTableParsingProcessFunction<>(parserFactory))
.setParallelism(input.getParallelism());
for (FileStoreTable table : tables) {
+ DataStream<Void> schemaChangeProcessFunction =
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
+ CdcMultiTableParsingProcessFunction
+
.createUpdatedDataFieldsOutputTag(table.name()))
+ .process(
+ new UpdatedDataFieldsProcessFunction(
+ new SchemaManager(table.fileIO(),
table.location())));
+ schemaChangeProcessFunction.getTransformation().setParallelism(1);
+
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+
+ DataStream<CdcRecord> parsedForTable =
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
+
CdcMultiTableParsingProcessFunction.createRecordOutputTag(
+ table.name()));
+
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case FIXED:
- buildForFixedBucket(table, parsed);
+ buildForFixedBucket(table, parsedForTable);
break;
case DYNAMIC:
+ buildForDynamicBucket(table, parsedForTable);
+ break;
case UNAWARE:
default:
throw new UnsupportedOperationException(
@@ -106,29 +122,15 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
}
}
- private void buildForFixedBucket(
- FileStoreTable table, SingleOutputStreamOperator<Void> parsed) {
- DataStream<Void> schemaChangeProcessFunction =
- SingleOutputStreamOperatorUtils.getSideOutput(
- parsed,
- CdcMultiTableParsingProcessFunction
-
.createUpdatedDataFieldsOutputTag(table.name()))
- .process(
- new UpdatedDataFieldsProcessFunction(
- new SchemaManager(table.fileIO(),
table.location())));
- schemaChangeProcessFunction.getTransformation().setParallelism(1);
- schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+ private void buildForDynamicBucket(FileStoreTable table,
DataStream<CdcRecord> parsed) {
+ new CdcDynamicBucketSink(table, lockFactory).build(parsed,
parallelism);
+ }
+ private void buildForFixedBucket(FileStoreTable table,
DataStream<CdcRecord> parsed) {
FlinkStreamPartitioner<CdcRecord> partitioner =
new FlinkStreamPartitioner<>(new
CdcRecordChannelComputer(table.schema()));
PartitionTransformation<CdcRecord> partitioned =
- new PartitionTransformation<>(
- SingleOutputStreamOperatorUtils.getSideOutput(
- parsed,
-
CdcMultiTableParsingProcessFunction.createRecordOutputTag(
- table.name()))
- .getTransformation(),
- partitioner);
+ new PartitionTransformation<>(parsed.getTransformation(),
partitioner);
if (parallelism != null) {
partitioned.setParallelism(parallelism);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
new file mode 100644
index 000000000..4b55537f2
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for batch file store. */
+public class DynamicBucketTableITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T ("
+ + "pt INT, "
+ + "pk INT, "
+ + "v INT, "
+ + "PRIMARY KEY (pt, pk) NOT ENFORCED"
+ + ") PARTITIONED BY (pt) WITH ("
+ + " 'bucket'='-1', "
+ + " 'dynamic-bucket.target-row-num'='3' "
+ + ")");
+ }
+
+ @Test
+ public void testWriteRead() {
+ sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4),
(1, 5, 5)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 1),
+ Row.of(1, 2, 2),
+ Row.of(1, 3, 3),
+ Row.of(1, 4, 4),
+ Row.of(1, 5, 5));
+ sql("INSERT INTO T VALUES (1, 3, 33), (1, 1, 11)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 11),
+ Row.of(1, 2, 2),
+ Row.of(1, 3, 33),
+ Row.of(1, 4, 4),
+ Row.of(1, 5, 5));
+
+ assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+ .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 1df02a2f1..d3cf58e0f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -53,6 +53,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
/** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {
@@ -65,6 +66,16 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
@Test
@Timeout(120)
public void testRandomCdcEvents() throws Exception {
+ innerTestRandomCdcEvents(() -> ThreadLocalRandom.current().nextInt(5)
+ 1);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEventsDynamicBucket() throws Exception {
+ innerTestRandomCdcEvents(() -> -1);
+ }
+
+ private void innerTestRandomCdcEvents(Supplier<Integer> bucket) throws
Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numTables = random.nextInt(3) + 1;
@@ -74,7 +85,6 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
int maxSchemaChanges = 10;
int maxPartitions = 3;
int maxKeys = 150;
- int maxBuckets = 5;
String failingName = UUID.randomUUID().toString();
@@ -121,7 +131,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
testTable.initialRowType(),
Collections.singletonList("pt"),
Arrays.asList("pt", "k"),
- random.nextInt(maxBuckets) + 1);
+ bucket.get());
fileStoreTables.add(fileStoreTable);
}
@@ -177,6 +187,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
throws Exception {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, numBucket);
+ conf.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 100L);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index fac8cb761..e47cb03d9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -60,13 +60,22 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
@Test
@Timeout(120)
public void testRandomCdcEvents() throws Exception {
+ innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEventsDynamicBucket() throws Exception {
+ innerTestRandomCdcEvents(-1);
+ }
+
+ private void innerTestRandomCdcEvents(int numBucket) throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numEvents = random.nextInt(1500) + 1;
int numSchemaChanges = Math.min(numEvents / 2, random.nextInt(10) + 1);
int numPartitions = random.nextInt(3) + 1;
int numKeys = random.nextInt(150) + 1;
- int numBucket = random.nextInt(5) + 1;
boolean enableFailure = random.nextBoolean();
TestTable testTable =
@@ -141,6 +150,7 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
throws Exception {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, numBucket);
+ conf.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 100L);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));