This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ebb1d9c6b [flink] Use union list state to correctly deal with sink
parallelism changes (#868)
ebb1d9c6b is described below
commit ebb1d9c6b581087616ff6c8a9e4e5e49f21731d1
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 11 15:51:23 2023 +0800
[flink] Use union list state to correctly deal with sink parallelism
changes (#868)
---
docs/content/how-to/writing-tables.md | 3 +-
.../generated/flink_connector_configuration.html | 6 -
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 +-
.../paimon/flink/sink/AbstractChannelComputer.java | 71 ------
.../flink/sink/AbstractStoreWriteOperator.java | 236 -----------------
.../flink/sink/BucketingStreamPartitioner.java | 12 +-
...taChannelComputer.java => ChannelComputer.java} | 22 +-
.../apache/paimon/flink/sink/CompactorSink.java | 4 +-
.../apache/paimon/flink/sink/FileStoreSink.java | 4 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 53 ++--
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 37 +--
.../flink/sink/FullChangelogStoreSinkWrite.java | 281 ---------------------
.../flink/sink/GlobalFullCompactionSinkWrite.java | 84 +++---
.../paimon/flink/sink/RowDataChannelComputer.java | 46 +++-
.../flink/sink/RowDataStoreWriteOperator.java | 227 ++++++++++++++++-
.../org/apache/paimon/flink/sink/StateUtils.java | 12 +-
.../paimon/flink/sink/StoreCompactOperator.java | 42 ++-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 12 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 42 +--
.../paimon/flink/sink/StoreSinkWriteState.java | 148 +++++++++++
.../flink/sink/cdc/CdcRecordChannelComputer.java | 39 ++-
.../sink/cdc/CdcRecordStoreWriteOperator.java | 143 +++++++++++
.../apache/paimon/flink/sink/cdc/FlinkCdcSink.java | 14 +-
.../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java | 48 +---
.../sink/cdc/SchemaAwareStoreWriteOperator.java | 89 -------
.../flink/sink/FileStoreShuffleBucketTest.java | 169 -------------
.../flink/sink/RowDataChannelComputerTest.java | 171 +++++++++++++
.../paimon/flink/sink/SinkSavepointITCase.java | 155 ++++++++----
.../sink/cdc/CdcRecordChannelComputerTest.java | 161 ++++++++++++
...t.java => CdcRecordStoreWriteOperatorTest.java} | 16 +-
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 2 -
31 files changed, 1184 insertions(+), 1173 deletions(-)
diff --git a/docs/content/how-to/writing-tables.md
b/docs/content/how-to/writing-tables.md
index 006635938..0b408ed5c 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -98,8 +98,7 @@ Use `INSERT INTO` to apply records and changes to tables.
INSERT INTO MyTable SELECT ...
```
-Paimon supports shuffle data by bucket in sink phase. To improve data skew,
Paimon also
-supports shuffling data by partition fields. You can add option
`sink.partition-shuffle` to the table.
+Paimon supports shuffle data by partition and bucket in sink phase.
{{< /tab >}}
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 6d48d12f5..8a5951dab 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -74,12 +74,6 @@
<td>Integer</td>
<td>Defines a custom parallelism for the sink. By default, if this
option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration.</td>
</tr>
- <tr>
- <td><h5>sink.partition-shuffle</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>The option to enable shuffle data by dynamic partition fields
in sink phase for paimon.</td>
- </tr>
<tr>
<td><h5>streaming-read-atomic</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a1302cdfd..f6b0f1110 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -70,13 +70,6 @@ public class FlinkConnectorOptions {
+ "By default, if this option is not
defined, the planner will derive the parallelism "
+ "for each statement individually by also
considering the global configuration.");
- public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION =
- ConfigOptions.key("sink.partition-shuffle")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "The option to enable shuffle data by dynamic
partition fields in sink phase for paimon.");
-
public static final ConfigOption<Integer> SCAN_PARALLELISM =
ConfigOptions.key("scan.parallelism")
.intType()
@@ -86,6 +79,7 @@ public class FlinkConnectorOptions {
+ "By default, if this option is not
defined, the planner will derive the parallelism "
+ "for each statement individually by also
considering the global configuration. "
+ "If user enable the
scan.infer-parallelism, the planner will derive the parallelism by inferred
parallelism.");
+
public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
ConfigOptions.key("scan.infer-parallelism")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java
deleted file mode 100644
index bb5d364ad..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * A utility class to compute which downstream channel a given record should
be sent to.
- *
- * @param <T> type of record
- */
-public abstract class AbstractChannelComputer<T> {
-
- private final int numChannels;
- private final KeyAndBucketExtractor<T> extractor;
- protected final boolean shuffleByPartitionEnable;
-
- public AbstractChannelComputer(
- int numChannels, KeyAndBucketExtractor<T> extractor, boolean
shuffleByPartitionEnable) {
- this.numChannels = numChannels;
- this.extractor = extractor;
- this.shuffleByPartitionEnable = shuffleByPartitionEnable;
- }
-
- public abstract int channel(T record);
-
- protected int channelImpl(T record, Object... otherChannelKeys) {
- extractor.setRecord(record);
- int bucket = extractor.bucket();
- int otherChannelKeysHash = Objects.hash(otherChannelKeys);
-
- if (shuffleByPartitionEnable) {
- BinaryRow partition = extractor.partition();
- return Math.abs(Objects.hash(bucket, partition,
otherChannelKeysHash)) % numChannels;
- } else {
- return Math.abs(Objects.hash(bucket, otherChannelKeysHash)) %
numChannels;
- }
- }
-
- /**
- * Provider of {@link AbstractChannelComputer}.
- *
- * @param <T> type of record
- */
- public interface Provider<T> extends Serializable {
-
- AbstractChannelComputer<T> provide(int numChannels);
-
- String toString();
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
deleted file mode 100644
index 9f9d34a4e..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractStoreWriteOperator.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.flink.log.LogWriteCallback;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.SinkRecord;
-
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.List;
-
-/** A {@link PrepareCommitOperator} to write records. */
-public abstract class AbstractStoreWriteOperator<T> extends
PrepareCommitOperator<T> {
-
- private static final long serialVersionUID = 2L;
-
- protected FileStoreTable table;
-
- @Nullable private final LogSinkFunction logSinkFunction;
-
- private final StoreSinkWrite.Provider storeSinkWriteProvider;
-
- protected transient StoreSinkWrite write;
-
- private transient SimpleContext sinkContext;
-
- /** We listen to this ourselves because we don't have an {@link
InternalTimerService}. */
- private long currentWatermark = Long.MIN_VALUE;
-
- @Nullable private transient LogWriteCallback logCallback;
-
- public AbstractStoreWriteOperator(
- FileStoreTable table,
- @Nullable LogSinkFunction logSinkFunction,
- StoreSinkWrite.Provider storeSinkWriteProvider) {
- this.table = table;
- this.logSinkFunction = logSinkFunction;
- this.storeSinkWriteProvider = storeSinkWriteProvider;
- }
-
- @Override
- public void setup(
- StreamTask<?, ?> containingTask,
- StreamConfig config,
- Output<StreamRecord<Committable>> output) {
- super.setup(containingTask, config, output);
- if (logSinkFunction != null) {
- FunctionUtils.setFunctionRuntimeContext(logSinkFunction,
getRuntimeContext());
- }
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws
Exception {
- super.initializeState(context);
- write =
- storeSinkWriteProvider.provide(
- table, context,
getContainingTask().getEnvironment().getIOManager());
- if (logSinkFunction != null) {
- StreamingFunctionUtils.restoreFunctionState(context,
logSinkFunction);
- }
- }
-
- @Override
- public void open() throws Exception {
- super.open();
-
- this.sinkContext = new SimpleContext(getProcessingTimeService());
- if (logSinkFunction != null) {
- FunctionUtils.openFunction(logSinkFunction, new Configuration());
- logCallback = new LogWriteCallback();
- logSinkFunction.setWriteCallback(logCallback);
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- super.processWatermark(mark);
-
- this.currentWatermark = mark.getTimestamp();
- if (logSinkFunction != null) {
- logSinkFunction.writeWatermark(
- new
org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
- }
- }
-
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- sinkContext.timestamp = element.hasTimestamp() ?
element.getTimestamp() : null;
-
- SinkRecord record = processRecord(element.getValue());
-
- if (logSinkFunction != null) {
- // write to log store, need to preserve original pk (which
includes partition fields)
- SinkRecord logRecord = write.toLogRecord(record);
- logSinkFunction.invoke(logRecord, sinkContext);
- }
- }
-
- protected abstract SinkRecord processRecord(T record) throws Exception;
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
-
- write.snapshotState(context);
-
- if (logSinkFunction != null) {
- StreamingFunctionUtils.snapshotFunctionState(
- context, getOperatorStateBackend(), logSinkFunction);
- }
- }
-
- @Override
- public void finish() throws Exception {
- super.finish();
-
- if (logSinkFunction != null) {
- logSinkFunction.finish();
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
-
- if (write != null) {
- write.close();
- }
-
- if (logSinkFunction != null) {
- FunctionUtils.closeFunction(logSinkFunction);
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- super.notifyCheckpointComplete(checkpointId);
-
- if (logSinkFunction instanceof CheckpointListener) {
- ((CheckpointListener)
logSinkFunction).notifyCheckpointComplete(checkpointId);
- }
- }
-
- @Override
- public void notifyCheckpointAborted(long checkpointId) throws Exception {
- super.notifyCheckpointAborted(checkpointId);
-
- if (logSinkFunction instanceof CheckpointListener) {
- ((CheckpointListener)
logSinkFunction).notifyCheckpointAborted(checkpointId);
- }
- }
-
- @Override
- protected List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
- throws IOException {
- List<Committable> committables = write.prepareCommit(doCompaction,
checkpointId);
-
- if (logCallback != null) {
- try {
- logSinkFunction.flush();
- } catch (Exception e) {
- throw new IOException(e);
- }
- logCallback
- .offsets()
- .forEach(
- (k, v) ->
- committables.add(
- new Committable(
- checkpointId,
-
Committable.Kind.LOG_OFFSET,
- new
LogOffsetCommittable(k, v))));
- }
-
- return committables;
- }
-
- private class SimpleContext implements SinkFunction.Context {
-
- @Nullable private Long timestamp;
-
- private final ProcessingTimeService processingTimeService;
-
- public SimpleContext(ProcessingTimeService processingTimeService) {
- this.processingTimeService = processingTimeService;
- }
-
- @Override
- public long currentProcessingTime() {
- return processingTimeService.getCurrentProcessingTime();
- }
-
- @Override
- public long currentWatermark() {
- return currentWatermark;
- }
-
- @Override
- public Long timestamp() {
- return timestamp;
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
index c32b56b80..439a97c28 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java
@@ -31,18 +31,16 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
*/
public class BucketingStreamPartitioner<T> extends StreamPartitioner<T> {
- private final AbstractChannelComputer.Provider<T> channelComputerProvider;
+ private final ChannelComputer<T> channelComputer;
- private transient AbstractChannelComputer<T> channelComputer;
-
- public BucketingStreamPartitioner(AbstractChannelComputer.Provider<T>
channelComputerProvider) {
- this.channelComputerProvider = channelComputerProvider;
+ public BucketingStreamPartitioner(ChannelComputer<T> channelComputer) {
+ this.channelComputer = channelComputer;
}
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
- channelComputer = channelComputerProvider.provide(numberOfChannels);
+ channelComputer.setup(numberOfChannels);
}
@Override
@@ -67,6 +65,6 @@ public class BucketingStreamPartitioner<T> extends
StreamPartitioner<T> {
@Override
public String toString() {
- return channelComputerProvider.toString();
+ return channelComputer.toString();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
similarity index 60%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
index 2ae41e577..fdef4c141 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
@@ -18,20 +18,16 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.schema.TableSchema;
+import java.io.Serializable;
-import org.apache.flink.table.data.RowData;
-
-/** {@link AbstractChannelComputer} for {@link RowData}. */
-public class RowDataChannelComputer extends AbstractChannelComputer<RowData> {
+/**
+ * A utility class to compute which downstream channel a given record should
be sent to.
+ *
+ * @param <T> type of record
+ */
+public interface ChannelComputer<T> extends Serializable {
- public RowDataChannelComputer(
- int numChannels, TableSchema schema, boolean
shuffleByPartitionEnable) {
- super(numChannels, new RowDataKeyAndBucketExtractor(schema),
shuffleByPartitionEnable);
- }
+ void setup(int numChannels);
- @Override
- public int channel(RowData record) {
- return channelImpl(record);
- }
+ int channel(T record);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index ee26918a9..45975f219 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -39,8 +39,8 @@ public class CompactorSink extends FlinkSink<RowData> {
@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
- return new StoreCompactOperator(table, writeProvider, isStreaming);
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ return new StoreCompactOperator(table, writeProvider, isStreaming,
commitUser);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index b8d5bf2af..cd2ff68d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -53,8 +53,8 @@ public class FileStoreSink extends FlinkSink<RowData> {
@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
- return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider);
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider, commitUser);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f0fe07081..d0a456d26 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -61,48 +61,46 @@ public abstract class FlinkSink<T> implements Serializable {
this.isOverwrite = isOverwrite;
}
- protected StoreSinkWrite.Provider createWriteProvider(String
initialCommitUser) {
+ private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig
checkpointConfig) {
boolean waitCompaction;
if (table.coreOptions().writeOnly()) {
waitCompaction = false;
} else {
Options options = table.coreOptions().toConfiguration();
ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
- if (changelogProducer == ChangelogProducer.FULL_COMPACTION
- &&
options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
+ waitCompaction =
+ changelogProducer == ChangelogProducer.LOOKUP
+ && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+
+ int deltaCommits = -1;
+ if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
+ deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
+ } else if
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
long fullCompactionThresholdMs =
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
- return (table, context, ioManager) ->
- new FullChangelogStoreSinkWrite(
- table,
- context,
- initialCommitUser,
- ioManager,
- isOverwrite,
- fullCompactionThresholdMs);
+ deltaCommits =
+ (int)
+ (fullCompactionThresholdMs
+ /
checkpointConfig.getCheckpointInterval());
}
- waitCompaction =
- changelogProducer == ChangelogProducer.LOOKUP
- && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
- if (changelogProducer == ChangelogProducer.FULL_COMPACTION
- || options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
- int deltaCommits =
options.getOptional(FULL_COMPACTION_DELTA_COMMITS).orElse(1);
- return (table, context, ioManager) ->
+ if (changelogProducer == ChangelogProducer.FULL_COMPACTION ||
deltaCommits >= 0) {
+ int finalDeltaCommits = Math.max(deltaCommits, 1);
+ return (table, commitUser, state, ioManager) ->
new GlobalFullCompactionSinkWrite(
table,
- context,
- initialCommitUser,
+ commitUser,
+ state,
ioManager,
isOverwrite,
waitCompaction,
- deltaCommits);
+ finalDeltaCommits);
}
}
- return (table, context, ioManager) ->
+ return (table, commitUser, state, ioManager) ->
new StoreSinkWriteImpl(
- table, context, initialCommitUser, ioManager,
isOverwrite, waitCompaction);
+ table, commitUser, state, ioManager, isOverwrite,
waitCompaction);
}
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
@@ -112,7 +110,10 @@ public abstract class FlinkSink<T> implements Serializable
{
// When the job restarts, commitUser will be recovered from states and
this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
- return sinkFrom(input, initialCommitUser,
createWriteProvider(initialCommitUser));
+ return sinkFrom(
+ input,
+ initialCommitUser,
+
createWriteProvider(input.getExecutionEnvironment().getCheckpointConfig()));
}
public DataStreamSink<?> sinkFrom(
@@ -134,7 +135,7 @@ public abstract class FlinkSink<T> implements Serializable {
input.transform(
WRITER_NAME + " -> " + table.name(),
typeInfo,
- createWriteOperator(sinkProvider, isStreaming))
+ createWriteOperator(sinkProvider, isStreaming,
commitUser))
.setParallelism(input.getParallelism());
SingleOutputStreamOperator<?> committed =
@@ -165,7 +166,7 @@ public abstract class FlinkSink<T> implements Serializable {
}
protected abstract OneInputStreamOperator<T, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming);
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser);
protected abstract SerializableFunction<String, Committer>
createCommitterFactory(
boolean streamingCheckpointEnabled);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index acdc76f33..c55c297f5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -19,9 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.operation.Lock;
-import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -85,14 +83,9 @@ public class FlinkSinkBuilder {
}
public DataStreamSink<?> build() {
- TableSchema schema = table.schema();
- boolean shuffleByPartitionEnable =
- table.coreOptions()
- .toConfiguration()
- .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION);
BucketingStreamPartitioner<RowData> partitioner =
new BucketingStreamPartitioner<>(
- new ChannelComputerProvider(schema,
shuffleByPartitionEnable));
+ new RowDataChannelComputer(table.schema(),
logSinkFunction != null));
PartitionTransformation<RowData> partitioned =
new PartitionTransformation<>(input.getTransformation(),
partitioner);
if (parallelism != null) {
@@ -106,32 +99,4 @@ public class FlinkSinkBuilder {
? sink.sinkFrom(new DataStream<>(env, partitioned),
commitUser, sinkProvider)
: sink.sinkFrom(new DataStream<>(env, partitioned));
}
-
- private static class ChannelComputerProvider
- implements AbstractChannelComputer.Provider<RowData> {
-
- private static final long serialVersionUID = 1L;
-
- private final TableSchema schema;
- private final boolean shuffleByPartitionEnable;
-
- private ChannelComputerProvider(TableSchema schema, boolean
shuffleByPartitionEnable) {
- this.schema = schema;
- this.shuffleByPartitionEnable = shuffleByPartitionEnable;
- }
-
- @Override
- public AbstractChannelComputer<RowData> provide(int numChannels) {
- return new RowDataChannelComputer(numChannels, schema,
shuffleByPartitionEnable);
- }
-
- @Override
- public String toString() {
- if (shuffleByPartitionEnable) {
- return "HASH[bucket, partition]";
- } else {
- return "HASH[bucket]";
- }
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
deleted file mode 100644
index 932a8bb01..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FullChangelogStoreSinkWrite.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.BinaryRowTypeSerializer;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.SinkRecord;
-import org.apache.paimon.utils.SnapshotManager;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * {@link StoreSinkWrite} for {@link
CoreOptions.ChangelogProducer#FULL_COMPACTION} changelog
- * producer. This writer will perform full compaction once in a while.
- *
- * @deprecated use {@link GlobalFullCompactionSinkWrite}.
- */
-public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
-
- private static final Logger LOG =
LoggerFactory.getLogger(FullChangelogStoreSinkWrite.class);
-
- private final SnapshotManager snapshotManager;
- private final long fullCompactionThresholdMs;
-
- private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
- private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>>
writtenBuckets;
- private final ListState<Tuple3<Long, BinaryRow, Integer>>
writtenBucketState;
-
- private Long currentFirstWriteMs;
- private final NavigableMap<Long, Long> firstWriteMs;
- private final ListState<Tuple2<Long, Long>> firstWriteMsState;
-
- private final TreeSet<Long> commitIdentifiersToCheck;
-
- public FullChangelogStoreSinkWrite(
- FileStoreTable table,
- StateInitializationContext context,
- String initialCommitUser,
- IOManager ioManager,
- boolean isOverwrite,
- long fullCompactionThresholdMs)
- throws Exception {
- super(table, context, initialCommitUser, ioManager, isOverwrite,
false);
-
- this.snapshotManager = table.snapshotManager();
- this.fullCompactionThresholdMs = fullCompactionThresholdMs;
-
- currentWrittenBuckets = new HashSet<>();
- TupleSerializer<Tuple3<Long, BinaryRow, Integer>>
writtenBucketStateSerializer =
- new TupleSerializer<>(
- (Class<Tuple3<Long, BinaryRow, Integer>>) (Class<?>)
Tuple3.class,
- new TypeSerializer[] {
- LongSerializer.INSTANCE,
- new BinaryRowTypeSerializer(
-
table.schema().logicalPartitionType().getFieldCount()),
- IntSerializer.INSTANCE
- });
- writtenBucketState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "paimon_written_buckets",
writtenBucketStateSerializer));
- writtenBuckets = new TreeMap<>();
- writtenBucketState
- .get()
- .forEach(
- t ->
- writtenBuckets
- .computeIfAbsent(t.f0, k -> new
HashSet<>())
- .add(Tuple2.of(t.f1, t.f2)));
-
- currentFirstWriteMs = null;
- TupleSerializer<Tuple2<Long, Long>> firstWriteMsStateSerializer =
- new TupleSerializer<>(
- (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
- new TypeSerializer[] {LongSerializer.INSTANCE,
LongSerializer.INSTANCE});
- firstWriteMsState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "first_write_ms",
firstWriteMsStateSerializer));
- firstWriteMs = new TreeMap<>();
- firstWriteMsState.get().forEach(t -> firstWriteMs.put(t.f0, t.f1));
-
- commitIdentifiersToCheck = new TreeSet<>();
- }
-
- @Override
- public SinkRecord write(InternalRow rowData) throws Exception {
- SinkRecord sinkRecord = super.write(rowData);
- touchBucket(sinkRecord.partition(), sinkRecord.bucket());
- return sinkRecord;
- }
-
- @Override
- public void compact(BinaryRow partition, int bucket, boolean
fullCompaction) throws Exception {
- super.compact(partition, bucket, fullCompaction);
- touchBucket(partition, bucket);
- }
-
- private void touchBucket(BinaryRow partition, int bucket) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("touch partition {}, bucket {}", partition, bucket);
- }
-
- // partition is a reused BinaryRow
- // we first check if the tuple exists to minimize copying
- if (!currentWrittenBuckets.contains(Tuple2.of(partition, bucket))) {
- currentWrittenBuckets.add(Tuple2.of(partition.copy(), bucket));
- }
-
- if (currentFirstWriteMs == null) {
- currentFirstWriteMs = System.currentTimeMillis();
- }
- }
-
- @Override
- public List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
- throws IOException {
- checkSuccessfulFullCompaction();
-
- // check what buckets we've modified during this checkpoint interval
- if (!currentWrittenBuckets.isEmpty()) {
- writtenBuckets
- .computeIfAbsent(checkpointId, k -> new HashSet<>())
- .addAll(currentWrittenBuckets);
- currentWrittenBuckets.clear();
- firstWriteMs.putIfAbsent(checkpointId, currentFirstWriteMs);
- currentFirstWriteMs = null;
- }
-
- if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>>
checkpointIdAndBuckets :
- writtenBuckets.entrySet()) {
- LOG.debug(
- "Written buckets for checkpoint #{} are:",
checkpointIdAndBuckets.getKey());
- for (Tuple2<BinaryRow, Integer> bucket :
checkpointIdAndBuckets.getValue()) {
- LOG.debug(" * partition {}, bucket {}", bucket.f0,
bucket.f1);
- }
- }
- }
-
- if (!writtenBuckets.isEmpty() // there should be something to compact
- && System.currentTimeMillis() -
firstWriteMs.firstEntry().getValue()
- >= fullCompactionThresholdMs // time without full
compaction exceeds
- ) {
- doCompaction = true;
- }
-
- if (doCompaction) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Submit full compaction for checkpoint #{}",
checkpointId);
- }
- submitFullCompaction();
- commitIdentifiersToCheck.add(checkpointId);
- }
-
- return super.prepareCommit(doCompaction, checkpointId);
- }
-
- private void checkSuccessfulFullCompaction() {
- Long latestId = snapshotManager.latestSnapshotId();
- if (latestId == null) {
- return;
- }
- Long earliestId = snapshotManager.earliestSnapshotId();
- if (earliestId == null) {
- return;
- }
-
- for (long id = latestId; id >= earliestId; id--) {
- Snapshot snapshot = snapshotManager.snapshot(id);
- if (snapshot.commitUser().equals(commitUser)
- && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
- long commitIdentifier = snapshot.commitIdentifier();
- if (commitIdentifiersToCheck.contains(commitIdentifier)) {
- // We found a full compaction snapshot triggered by
`submitFullCompaction`
- // method.
- //
- // Because `submitFullCompaction` will compact all buckets
in `writtenBuckets`,
- // thus a successful commit indicates that all previous
buckets have been
- // compacted.
- //
- // We must make sure that the compact snapshot is
triggered by
- // `submitFullCompaction`, because normal compaction may
also trigger full
- // compaction, but that only compacts a specific bucket,
not all buckets
- // recorded in `writtenBuckets`.
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Found full compaction snapshot #{} with
identifier {}",
- id,
- commitIdentifier);
- }
- writtenBuckets.headMap(commitIdentifier, true).clear();
- firstWriteMs.headMap(commitIdentifier, true).clear();
- commitIdentifiersToCheck.headSet(commitIdentifier).clear();
- break;
- }
- }
- }
- }
-
- private void submitFullCompaction() {
- Set<Tuple2<BinaryRow, Integer>> compactedBuckets = new HashSet<>();
- writtenBuckets.forEach(
- (checkpointId, buckets) -> {
- for (Tuple2<BinaryRow, Integer> bucket : buckets) {
- if (compactedBuckets.contains(bucket)) {
- continue;
- }
- compactedBuckets.add(bucket);
- try {
- write.compact(bucket.f0, bucket.f1, true);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
-
- List<Tuple3<Long, BinaryRow, Integer>> writtenBucketList = new
ArrayList<>();
- for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry :
writtenBuckets.entrySet()) {
- for (Tuple2<BinaryRow, Integer> bucket : entry.getValue()) {
- writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0,
bucket.f1));
- }
- }
- writtenBucketState.update(writtenBucketList);
-
- List<Tuple2<Long, Long>> firstWriteMsList = new ArrayList<>();
- for (Map.Entry<Long, Long> entry : firstWriteMs.entrySet()) {
- firstWriteMsList.add(Tuple2.of(entry.getKey(), entry.getValue()));
- }
- firstWriteMsState.update(firstWriteMsList);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index ac666ff12..68bc682ec 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -21,22 +21,12 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.BinaryRowTypeSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,53 +52,41 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
private final int deltaCommits;
+ private final String tableName;
private final SnapshotManager snapshotManager;
private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>>
writtenBuckets;
- private final ListState<Tuple3<Long, BinaryRow, Integer>>
writtenBucketState;
+ private static final String WRITTEN_BUCKETS_STATE_NAME =
"paimon_written_buckets";
private final TreeSet<Long> commitIdentifiersToCheck;
public GlobalFullCompactionSinkWrite(
FileStoreTable table,
- StateInitializationContext context,
- String initialCommitUser,
+ String commitUser,
+ StoreSinkWriteState state,
IOManager ioManager,
boolean isOverwrite,
boolean waitCompaction,
- int deltaCommits)
- throws Exception {
- super(table, context, initialCommitUser, ioManager, isOverwrite,
waitCompaction);
+ int deltaCommits) {
+ super(table, commitUser, state, ioManager, isOverwrite,
waitCompaction);
this.deltaCommits = deltaCommits;
+ this.tableName = table.name();
this.snapshotManager = table.snapshotManager();
currentWrittenBuckets = new HashSet<>();
- @SuppressWarnings("unchecked")
- TupleSerializer<Tuple3<Long, BinaryRow, Integer>>
writtenBucketStateSerializer =
- new TupleSerializer<>(
- (Class<Tuple3<Long, BinaryRow, Integer>>) (Class<?>)
Tuple3.class,
- new TypeSerializer[] {
- LongSerializer.INSTANCE,
- new BinaryRowTypeSerializer(
-
table.schema().logicalPartitionType().getFieldCount()),
- IntSerializer.INSTANCE
- });
- writtenBucketState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "paimon_written_buckets",
writtenBucketStateSerializer));
writtenBuckets = new TreeMap<>();
- writtenBucketState
- .get()
- .forEach(
- t ->
- writtenBuckets
- .computeIfAbsent(t.f0, k -> new
HashSet<>())
- .add(Tuple2.of(t.f1, t.f2)));
+ List<StoreSinkWriteState.StateValue> writtenBucketStateValues =
+ state.get(tableName, WRITTEN_BUCKETS_STATE_NAME);
+ if (writtenBucketStateValues != null) {
+ for (StoreSinkWriteState.StateValue stateValue :
writtenBucketStateValues) {
+ writtenBuckets
+ .computeIfAbsent(bytesToLong(stateValue.value()), k ->
new HashSet<>())
+ .add(Tuple2.of(stateValue.partition(),
stateValue.bucket()));
+ }
+ }
commitIdentifiersToCheck = new TreeSet<>();
}
@@ -240,15 +218,35 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
}
@Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
+ public void snapshotState() throws Exception {
+ super.snapshotState();
- List<Tuple3<Long, BinaryRow, Integer>> writtenBucketList = new
ArrayList<>();
+ List<StoreSinkWriteState.StateValue> writtenBucketList = new
ArrayList<>();
for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry :
writtenBuckets.entrySet()) {
for (Tuple2<BinaryRow, Integer> bucket : entry.getValue()) {
- writtenBucketList.add(Tuple3.of(entry.getKey(), bucket.f0,
bucket.f1));
+ writtenBucketList.add(
+ new StoreSinkWriteState.StateValue(
+ bucket.f0, bucket.f1,
longToBytes(entry.getKey())));
}
}
- writtenBucketState.update(writtenBucketList);
+ state.put(tableName, WRITTEN_BUCKETS_STATE_NAME, writtenBucketList);
+ }
+
+ private static byte[] longToBytes(long l) {
+ byte[] result = new byte[8];
+ for (int i = 7; i >= 0; i--) {
+ result[i] = (byte) (l & 0xFF);
+ l >>= 8;
+ }
+ return result;
+ }
+
+ private static long bytesToLong(final byte[] b) {
+ long result = 0;
+ for (int i = 0; i < 8; i++) {
+ result <<= 8;
+ result |= (b[i] & 0xFF);
+ }
+ return result;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 2ae41e577..85e7ebce7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -18,20 +18,54 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.flink.table.data.RowData;
-/** {@link AbstractChannelComputer} for {@link RowData}. */
-public class RowDataChannelComputer extends AbstractChannelComputer<RowData> {
+/** {@link ChannelComputer} for {@link RowData}. */
+public class RowDataChannelComputer implements ChannelComputer<RowData> {
- public RowDataChannelComputer(
- int numChannels, TableSchema schema, boolean
shuffleByPartitionEnable) {
- super(numChannels, new RowDataKeyAndBucketExtractor(schema),
shuffleByPartitionEnable);
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+ private final boolean hasLogSink;
+
+ private transient int numChannels;
+ private transient KeyAndBucketExtractor<RowData> extractor;
+
+ public RowDataChannelComputer(TableSchema schema, boolean hasLogSink) {
+ this.schema = schema;
+ this.hasLogSink = hasLogSink;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = new RowDataKeyAndBucketExtractor(schema);
}
@Override
public int channel(RowData record) {
- return channelImpl(record);
+ extractor.setRecord(record);
+ return channel(extractor.partition(), extractor.bucket());
+ }
+
+ public int channel(BinaryRow partition, int bucket) {
+ int startChannel;
+ if (hasLogSink) {
+ // log sinks like Kafka only consider bucket and don't care about
partition
+ // so same bucket, even from different partition, must go to the
same channel
+ startChannel = 0;
+ } else {
+ startChannel = Math.abs(partition.hashCode()) % numChannels;
+ }
+ return (startChannel + bucket) % numChannels;
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by bucket";
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 55a6b8375..16f5f98f3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -19,34 +19,243 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.List;
-/**
- * An {@link AbstractStoreWriteOperator} which accepts Flink's {@link
RowData}. The schema of its
- * writer never changes.
- */
-public class RowDataStoreWriteOperator extends
AbstractStoreWriteOperator<RowData> {
+/** A {@link PrepareCommitOperator} to write {@link RowData}. Record schema is
fixed. */
+public class RowDataStoreWriteOperator extends PrepareCommitOperator<RowData> {
+
+ private static final long serialVersionUID = 3L;
+
+ private final FileStoreTable table;
+ @Nullable private final LogSinkFunction logSinkFunction;
+ private final StoreSinkWrite.Provider storeSinkWriteProvider;
+ private final String initialCommitUser;
+
+ private transient StoreSinkWriteState state;
+ private transient StoreSinkWrite write;
+ private transient SimpleContext sinkContext;
+ @Nullable private transient LogWriteCallback logCallback;
+
+ /** We listen to this ourselves because we don't have an {@link
InternalTimerService}. */
+ private long currentWatermark = Long.MIN_VALUE;
public RowDataStoreWriteOperator(
FileStoreTable table,
@Nullable LogSinkFunction logSinkFunction,
- StoreSinkWrite.Provider storeSinkWriteProvider) {
- super(table, logSinkFunction, storeSinkWriteProvider);
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser) {
+ this.table = table;
+ this.logSinkFunction = logSinkFunction;
+ this.storeSinkWriteProvider = storeSinkWriteProvider;
+ this.initialCommitUser = initialCommitUser;
}
@Override
- protected SinkRecord processRecord(RowData record) throws Exception {
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Committable>> output) {
+ super.setup(containingTask, config, output);
+ if (logSinkFunction != null) {
+ FunctionUtils.setFunctionRuntimeContext(logSinkFunction,
getRuntimeContext());
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+
+ // Each job can only have one user name and this name must be
consistent across restarts.
+ // We cannot use job id as commit user name here because user may
change job id by creating
+ // a savepoint, stop the job and then resume from savepoint.
+ String commitUser =
+ StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class,
initialCommitUser);
+
+ RowDataChannelComputer channelComputer =
+ new RowDataChannelComputer(table.schema(), logSinkFunction !=
null);
+
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
+ state =
+ new StoreSinkWriteState(
+ context,
+ (tableName, partition, bucket) ->
+ channelComputer.channel(partition, bucket)
+ ==
getRuntimeContext().getIndexOfThisSubtask());
+
+ write =
+ storeSinkWriteProvider.provide(
+ table,
+ commitUser,
+ state,
+ getContainingTask().getEnvironment().getIOManager());
+
+ if (logSinkFunction != null) {
+ StreamingFunctionUtils.restoreFunctionState(context,
logSinkFunction);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ this.sinkContext = new SimpleContext(getProcessingTimeService());
+ if (logSinkFunction != null) {
+ FunctionUtils.openFunction(logSinkFunction, new Configuration());
+ logCallback = new LogWriteCallback();
+ logSinkFunction.setWriteCallback(logCallback);
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+
+ this.currentWatermark = mark.getTimestamp();
+ if (logSinkFunction != null) {
+ logSinkFunction.writeWatermark(
+ new
org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception
{
+ sinkContext.timestamp = element.hasTimestamp() ?
element.getTimestamp() : null;
+
+ SinkRecord record;
try {
- return write.write(new FlinkRowWrapper(record));
+ record = write.write(new FlinkRowWrapper(element.getValue()));
} catch (Exception e) {
throw new IOException(e);
}
+
+ if (logSinkFunction != null) {
+ // write to log store, need to preserve original pk (which
includes partition fields)
+ SinkRecord logRecord = write.toLogRecord(record);
+ logSinkFunction.invoke(logRecord, sinkContext);
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+
+ write.snapshotState();
+ state.snapshotState();
+
+ if (logSinkFunction != null) {
+ StreamingFunctionUtils.snapshotFunctionState(
+ context, getOperatorStateBackend(), logSinkFunction);
+ }
+ }
+
+ @Override
+ public void finish() throws Exception {
+ super.finish();
+
+ if (logSinkFunction != null) {
+ logSinkFunction.finish();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ write.close();
+ if (logSinkFunction != null) {
+ FunctionUtils.closeFunction(logSinkFunction);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+
+ if (logSinkFunction instanceof CheckpointListener) {
+ ((CheckpointListener)
logSinkFunction).notifyCheckpointComplete(checkpointId);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ super.notifyCheckpointAborted(checkpointId);
+
+ if (logSinkFunction instanceof CheckpointListener) {
+ ((CheckpointListener)
logSinkFunction).notifyCheckpointAborted(checkpointId);
+ }
+ }
+
+ @Override
+ protected List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
+ throws IOException {
+ List<Committable> committables = write.prepareCommit(doCompaction,
checkpointId);
+
+ if (logCallback != null) {
+ try {
+ logSinkFunction.flush();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ logCallback
+ .offsets()
+ .forEach(
+ (k, v) ->
+ committables.add(
+ new Committable(
+ checkpointId,
+
Committable.Kind.LOG_OFFSET,
+ new
LogOffsetCommittable(k, v))));
+ }
+
+ return committables;
+ }
+
+ private class SimpleContext implements SinkFunction.Context {
+
+ @Nullable private Long timestamp;
+
+ private final ProcessingTimeService processingTimeService;
+
+ public SimpleContext(ProcessingTimeService processingTimeService) {
+ this.processingTimeService = processingTimeService;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
+ }
+
+ @Override
+ public Long timestamp() {
+ return timestamp;
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
index 53756a43b..934aceeca 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
@@ -39,22 +39,12 @@ public class StateUtils {
throws Exception {
ListState<T> state =
context.getOperatorStateStore()
- .getListState(new ListStateDescriptor<>(stateName,
valueClass));
+ .getUnionListState(new
ListStateDescriptor<>(stateName, valueClass));
List<T> values = new ArrayList<>();
state.get().forEach(values::add);
if (context.isRestored()) {
- // Values may contain 0 element or more than 1 element.
- //
- // Let's say a vertex has 3 tasks (A, B and C). If A is finished
while B and C are still
- // running, then states of A will be divided between B and C. That
is, if the job
- // restarts, state of vertex A will be empty, and state of vertex
B and C may contain
- // more than 1 element.
- if (values.isEmpty()) {
- return null;
- }
-
// As we're storing the same value for each task, we hereby check
if all elements are
// equal.
for (int i = 1; i < values.size(); i++) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 89ab1e721..156b3a798 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.OffsetRow;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
@@ -47,7 +48,9 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
private final FileStoreTable table;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final boolean isStreaming;
+ private final String initialCommitUser;
+ private transient StoreSinkWriteState state;
private transient StoreSinkWrite write;
private transient InternalRowSerializer partitionSerializer;
private transient OffsetRow reusedPartition;
@@ -56,21 +59,43 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
public StoreCompactOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
- boolean isStreaming) {
+ boolean isStreaming,
+ String initialCommitUser) {
Preconditions.checkArgument(
!table.coreOptions().writeOnly(),
CoreOptions.WRITE_ONLY.key() + " should not be true for
StoreCompactOperator.");
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.isStreaming = isStreaming;
+ this.initialCommitUser = initialCommitUser;
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
+
+ // Each job can only have one user name and this name must be
consistent across restarts.
+ // We cannot use job id as commit user name here because user may
change job id by creating
+ // a savepoint, stop the job and then resume from savepoint.
+ String commitUser =
+ StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class,
initialCommitUser);
+
+ RowDataChannelComputer channelComputer = new
RowDataChannelComputer(table.schema(), false);
+
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
+ state =
+ new StoreSinkWriteState(
+ context,
+ (tableName, partition, bucket) ->
+ channelComputer.channel(partition, bucket)
+ ==
getRuntimeContext().getIndexOfThisSubtask());
+
write =
storeSinkWriteProvider.provide(
- table, context,
getContainingTask().getEnvironment().getIOManager());
+ table,
+ commitUser,
+ state,
+ getContainingTask().getEnvironment().getIOManager());
}
@Override
@@ -112,4 +137,17 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
throws IOException {
return write.prepareCommit(doCompaction, checkpointId);
}
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ write.snapshotState();
+ state.snapshotState();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ write.close();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 4e3e3a60a..0eae74e00 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -26,15 +26,13 @@ import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.function.Function;
-/** Helper class of {@link AbstractStoreWriteOperator} for different types of
paimon sinks. */
+/** Helper class of {@link PrepareCommitOperator} for different types of
paimon sinks. */
public interface StoreSinkWrite {
SinkRecord write(InternalRow rowData) throws Exception;
@@ -47,7 +45,7 @@ public interface StoreSinkWrite {
List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException;
- void snapshotState(StateSnapshotContext context) throws Exception;
+ void snapshotState() throws Exception;
void close() throws Exception;
@@ -68,7 +66,9 @@ public interface StoreSinkWrite {
interface Provider extends Serializable {
StoreSinkWrite provide(
- FileStoreTable table, StateInitializationContext context,
IOManager ioManager)
- throws Exception;
+ FileStoreTable table,
+ String commitUser,
+ StoreSinkWriteState state,
+ IOManager ioManager);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index e5988cca9..67f6ddab1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -29,8 +29,6 @@ import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,42 +43,26 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
private static final Logger LOG =
LoggerFactory.getLogger(StoreSinkWriteImpl.class);
protected final String commitUser;
+ protected final StoreSinkWriteState state;
private final boolean waitCompaction;
protected TableWriteImpl<?> write;
public StoreSinkWriteImpl(
FileStoreTable table,
- StateInitializationContext context,
- String initialCommitUser,
+ String commitUser,
+ StoreSinkWriteState state,
IOManager ioManager,
boolean isOverwrite,
- boolean waitCompaction)
- throws Exception {
- // Each job can only have one user name and this name must be
consistent across restarts.
- // We cannot use job id as commit user name here because user may
change job id by creating
- // a savepoint, stop the job and then resume from savepoint.
- commitUser =
- StateUtils.getSingleValueFromState(
- context, "commit_user_state", String.class,
initialCommitUser);
-
- // State will be null if the upstream of this subtask has finished,
but some other subtasks
- // are still running.
- // See comments of StateUtils.getSingleValueFromState for more detail.
- //
- // If the state is null, no new records will come. We only need to
deal with checkpoints and
- // close events.
- if (commitUser == null) {
- write = null;
- } else {
- write =
- table.newWrite(commitUser)
- .withIOManager(
- new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
- .withOverwrite(isOverwrite);
- }
-
+ boolean waitCompaction) {
+ this.commitUser = commitUser;
+ this.state = state;
this.waitCompaction = waitCompaction;
+
+ write =
+ table.newWrite(commitUser)
+ .withIOManager(new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+ .withOverwrite(isOverwrite);
}
@Override
@@ -131,7 +113,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
}
@Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
+ public void snapshotState() throws Exception {
// do nothing
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
new file mode 100644
index 000000000..feff9e9dd
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * States for {@link StoreSinkWrite}s.
+ *
+ * <p>States are positioned first by table name and then by key name. This
class should be initiated
+ * in a sink operator and then given to {@link StoreSinkWrite}.
+ */
+public class StoreSinkWriteState {
+
+ private final ListState<Tuple5<String, String, byte[], Integer, byte[]>>
listState;
+ private final Map<String, Map<String, List<StateValue>>> map;
+
+ @SuppressWarnings("unchecked")
+ public StoreSinkWriteState(
+ StateInitializationContext context, StateValueFilter
stateValueFilter)
+ throws Exception {
+ TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>>
listStateSerializer =
+ new TupleSerializer<>(
+ (Class<Tuple5<String, String, byte[], Integer,
byte[]>>)
+ (Class<?>) Tuple5.class,
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ StringSerializer.INSTANCE,
+ BytePrimitiveArraySerializer.INSTANCE,
+ IntSerializer.INSTANCE,
+ BytePrimitiveArraySerializer.INSTANCE
+ });
+ listState =
+ context.getOperatorStateStore()
+ .getUnionListState(
+ new ListStateDescriptor<>(
+ "paimon_store_sink_write_state",
listStateSerializer));
+
+ map = new HashMap<>();
+ for (Tuple5<String, String, byte[], Integer, byte[]> tuple :
listState.get()) {
+ BinaryRow partition =
SerializationUtils.deserializeBinaryRow(tuple.f2);
+ if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
+ map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
+ .computeIfAbsent(tuple.f1, k -> new ArrayList<>())
+ .add(new StateValue(partition, tuple.f3, tuple.f4));
+ }
+ }
+ }
+
+ public @Nullable List<StateValue> get(String tableName, String key) {
+ Map<String, List<StateValue>> innerMap = map.get(tableName);
+ return innerMap == null ? null : innerMap.get(key);
+ }
+
+ public void put(String tableName, String key, List<StateValue>
stateValues) {
+ map.computeIfAbsent(tableName, k -> new HashMap<>()).put(key,
stateValues);
+ }
+
+ public void snapshotState() throws Exception {
+ List<Tuple5<String, String, byte[], Integer, byte[]>> list = new
ArrayList<>();
+ for (Map.Entry<String, Map<String, List<StateValue>>> tables :
map.entrySet()) {
+ for (Map.Entry<String, List<StateValue>> entry :
tables.getValue().entrySet()) {
+ for (StateValue stateValue : entry.getValue()) {
+ list.add(
+ Tuple5.of(
+ tables.getKey(),
+ entry.getKey(),
+
SerializationUtils.serializeBinaryRow(stateValue.partition()),
+ stateValue.bucket(),
+ stateValue.value()));
+ }
+ }
+ }
+ listState.update(list);
+ }
+
+ /**
+ * A state value for {@link StoreSinkWrite}. All state values should be
given a partition and a
+ * bucket so that they can be redistributed once the sink parallelism is
changed.
+ */
+ public static class StateValue {
+
+ private final BinaryRow partition;
+ private final int bucket;
+ private final byte[] value;
+
+ public StateValue(BinaryRow partition, int bucket, byte[] value) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.value = value;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public byte[] value() {
+ return value;
+ }
+ }
+
+ /**
+ * Given the table name, partition and bucket of a {@link StateValue} in a
union list state,
+ * decide whether to keep this {@link StateValue} in this subtask.
+ */
+ public interface StateValueFilter {
+
+ boolean filter(String tableName, BinaryRow partition, int bucket);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
index 8a815bb69..c15340a74 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
@@ -18,19 +18,44 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.flink.sink.AbstractChannelComputer;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-/** {@link AbstractChannelComputer} for {@link CdcRecord}. */
-public class CdcRecordChannelComputer extends
AbstractChannelComputer<CdcRecord> {
+/** {@link ChannelComputer} for {@link CdcRecord}. */
+public class CdcRecordChannelComputer implements ChannelComputer<CdcRecord> {
- public CdcRecordChannelComputer(
- int numChannels, TableSchema schema, boolean
shuffleByPartitionEnable) {
- super(numChannels, new CdcRecordKeyAndBucketExtractor(schema),
shuffleByPartitionEnable);
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+
+ private transient int numChannels;
+ private transient KeyAndBucketExtractor<CdcRecord> extractor;
+
+ public CdcRecordChannelComputer(TableSchema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
}
@Override
public int channel(CdcRecord record) {
- return channelImpl(record);
+ extractor.setRecord(record);
+ return channel(extractor.partition(), extractor.bucket());
+ }
+
+ public int channel(BinaryRow partition, int bucket) {
+ int startChannel = Math.abs(partition.hashCode()) % numChannels;
+ return (startChannel + bucket) % numChannels;
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by bucket";
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
new file mode 100644
index 000000000..df45d8281
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.PrepareCommitOperator;
+import org.apache.paimon.flink.sink.StateUtils;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteState;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A {@link PrepareCommitOperator} to write {@link CdcRecord}. Record schema
may change. If current
+ * known schema does not fit record schema, this operator will wait for schema
changes.
+ */
+public class CdcRecordStoreWriteOperator extends
PrepareCommitOperator<CdcRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ static final ConfigOption<Duration> RETRY_SLEEP_TIME =
+ ConfigOptions.key("cdc.retry-sleep-time")
+ .durationType()
+ .defaultValue(Duration.ofMillis(500));
+
+ private FileStoreTable table;
+ private final StoreSinkWrite.Provider storeSinkWriteProvider;
+ private final String initialCommitUser;
+ private final long retrySleepMillis;
+
+ private transient StoreSinkWriteState state;
+ private transient StoreSinkWrite write;
+
+ public CdcRecordStoreWriteOperator(
+ FileStoreTable table,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser) {
+ this.table = table;
+ this.storeSinkWriteProvider = storeSinkWriteProvider;
+ this.initialCommitUser = initialCommitUser;
+ this.retrySleepMillis =
+
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+
+ // Each job can only have one user name and this name must be
consistent across restarts.
+ // We cannot use job id as commit user name here because user may
change job id by creating
+ // a savepoint, stop the job and then resume from savepoint.
+ String commitUser =
+ StateUtils.getSingleValueFromState(
+ context, "commit_user_state", String.class,
initialCommitUser);
+
+ CdcRecordChannelComputer channelComputer = new
CdcRecordChannelComputer(table.schema());
+
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
+ state =
+ new StoreSinkWriteState(
+ context,
+ (tableName, partition, bucket) ->
+ channelComputer.channel(partition, bucket)
+ ==
getRuntimeContext().getIndexOfThisSubtask());
+
+ table = table.copyWithLatestSchema();
+ write =
+ storeSinkWriteProvider.provide(
+ table,
+ commitUser,
+ state,
+ getContainingTask().getEnvironment().getIOManager());
+ }
+
+ @Override
+ public void processElement(StreamRecord<CdcRecord> element) throws
Exception {
+ CdcRecord record = element.getValue();
+ Optional<GenericRow> optionalConverted =
record.toGenericRow(table.schema().fields());
+ if (!optionalConverted.isPresent()) {
+ while (true) {
+ table = table.copyWithLatestSchema();
+ optionalConverted =
record.toGenericRow(table.schema().fields());
+ if (optionalConverted.isPresent()) {
+ break;
+ }
+ Thread.sleep(retrySleepMillis);
+ }
+ write.replace(commitUser -> table.newWrite(commitUser));
+ }
+
+ try {
+ write.write(optionalConverted.get());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+
+ write.snapshotState();
+ state.snapshotState();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ write.close();
+ }
+
+ @Override
+ protected List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
+ throws IOException {
+ return write.prepareCommit(doCompaction, checkpointId);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 24ddd9e56..1deda066e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.FlinkSink;
-import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
@@ -34,8 +33,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.util.function.SerializableFunction;
-import javax.annotation.Nullable;
-
/**
* A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema
change if necessary.
*/
@@ -44,21 +41,16 @@ public class FlinkCdcSink extends FlinkSink<CdcRecord> {
private static final long serialVersionUID = 1L;
private final Lock.Factory lockFactory;
- @Nullable private final LogSinkFunction logSinkFunction;
- public FlinkCdcSink(
- FileStoreTable table,
- Lock.Factory lockFactory,
- @Nullable LogSinkFunction logSinkFunction) {
+ public FlinkCdcSink(FileStoreTable table, Lock.Factory lockFactory) {
super(table, false);
this.lockFactory = lockFactory;
- this.logSinkFunction = logSinkFunction;
}
@Override
protected OneInputStreamOperator<CdcRecord, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
- return new SchemaAwareStoreWriteOperator(table, logSinkFunction,
writeProvider);
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ return new CdcRecordStoreWriteOperator(table, writeProvider,
commitUser);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
index 4e770e57c..6663acc54 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -18,14 +18,10 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.AbstractChannelComputer;
import org.apache.paimon.flink.sink.BucketingStreamPartitioner;
-import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
@@ -48,7 +44,6 @@ public class FlinkCdcSinkBuilder<T> {
private EventParser.Factory<T> parserFactory = null;
private FileStoreTable table = null;
private Lock.Factory lockFactory = Lock.emptyFactory();
- @Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
@@ -72,11 +67,6 @@ public class FlinkCdcSinkBuilder<T> {
return this;
}
- public FlinkCdcSinkBuilder<T> withLogSinkFunction(@Nullable
LogSinkFunction logSinkFunction) {
- this.logSinkFunction = logSinkFunction;
- return this;
- }
-
public FlinkCdcSinkBuilder<T> withParallelism(@Nullable Integer
parallelism) {
this.parallelism = parallelism;
return this;
@@ -100,14 +90,8 @@ public class FlinkCdcSinkBuilder<T> {
new SchemaManager(table.fileIO(),
table.location())));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
- TableSchema schema = table.schema();
- boolean shuffleByPartitionEnable =
- table.coreOptions()
- .toConfiguration()
- .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION);
BucketingStreamPartitioner<CdcRecord> partitioner =
- new BucketingStreamPartitioner<>(
- new ChannelComputerProvider(schema,
shuffleByPartitionEnable));
+ new BucketingStreamPartitioner<>(new
CdcRecordChannelComputer(table.schema()));
PartitionTransformation<CdcRecord> partitioned =
new PartitionTransformation<>(parsed.getTransformation(),
partitioner);
if (parallelism != null) {
@@ -115,35 +99,7 @@ public class FlinkCdcSinkBuilder<T> {
}
StreamExecutionEnvironment env = input.getExecutionEnvironment();
- FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory,
logSinkFunction);
+ FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory);
return sink.sinkFrom(new DataStream<>(env, partitioned));
}
-
- private static class ChannelComputerProvider
- implements AbstractChannelComputer.Provider<CdcRecord> {
-
- private static final long serialVersionUID = 1L;
-
- private final TableSchema schema;
- private final boolean shuffleByPartitionEnable;
-
- private ChannelComputerProvider(TableSchema schema, boolean
shuffleByPartitionEnable) {
- this.schema = schema;
- this.shuffleByPartitionEnable = shuffleByPartitionEnable;
- }
-
- @Override
- public AbstractChannelComputer<CdcRecord> provide(int numChannels) {
- return new CdcRecordChannelComputer(numChannels, schema,
shuffleByPartitionEnable);
- }
-
- @Override
- public String toString() {
- if (shuffleByPartitionEnable) {
- return "HASH[bucket, partition]";
- } else {
- return "HASH[bucket]";
- }
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
deleted file mode 100644
index 5b986d4e7..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.sink.AbstractStoreWriteOperator;
-import org.apache.paimon.flink.sink.LogSinkFunction;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.SinkRecord;
-
-import org.apache.flink.runtime.state.StateInitializationContext;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
-
-/**
- * An {@link AbstractStoreWriteOperator} which is aware of schema changes.
- *
- * <p>When the input {@link CdcRecord} contains a field name not in the
current {@link TableSchema},
- * it periodically queries the latest schema, until the latest schema contains
that field name.
- */
-public class SchemaAwareStoreWriteOperator extends
AbstractStoreWriteOperator<CdcRecord> {
-
- static final ConfigOption<Duration> RETRY_SLEEP_TIME =
- ConfigOptions.key("cdc.retry-sleep-time")
- .durationType()
- .defaultValue(Duration.ofMillis(500));
-
- private final long retrySleepMillis;
-
- public SchemaAwareStoreWriteOperator(
- FileStoreTable table,
- @Nullable LogSinkFunction logSinkFunction,
- StoreSinkWrite.Provider storeSinkWriteProvider) {
- super(table, logSinkFunction, storeSinkWriteProvider);
- retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws
Exception {
- table = table.copyWithLatestSchema();
- super.initializeState(context);
- }
-
- @Override
- protected SinkRecord processRecord(CdcRecord record) throws Exception {
- Optional<GenericRow> optionalConverted =
record.toGenericRow(table.schema().fields());
- if (!optionalConverted.isPresent()) {
- while (true) {
- table = table.copyWithLatestSchema();
- optionalConverted =
record.toGenericRow(table.schema().fields());
- if (optionalConverted.isPresent()) {
- break;
- }
- Thread.sleep(retrySleepMillis);
- }
- write.replace(commitUser -> table.newWrite(commitUser));
- }
-
- try {
- return write.write(optionalConverted.get());
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
deleted file mode 100644
index a5e240c12..000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FileStoreShuffleBucketTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.CatalogITCaseBase;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.SinkRecord;
-import org.apache.paimon.table.sink.TableWriteImpl;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-
-import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/** Tests of shuffle data by bucket and partition. */
-public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
- private static final int TOTAL_SOURCE_RECORD_COUNT = 1000;
-
- @BeforeEach
- public void after() throws Exception {
- super.before();
- CollectStoreSinkWrite.writeRowsMap.clear();
- }
-
- @Override
- protected List<String> ddl() {
- return Collections.singletonList(
- "CREATE TABLE T (a INT, b INT, c INT, d INT, PRIMARY KEY (a,
b) NOT ENFORCED) PARTITIONED BY (a)");
- }
-
- @Test
- public void testShuffleByBucket() throws Exception {
- FileStoreTable table =
- FileStoreTableFactory.create(LocalFileIO.create(),
getTableDirectory("T"));
-
- insertDataToTable(table);
-
- // Only one task will write records shuffled by bucket
- assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 1);
- }
-
- @Test
- public void testShuffleByBucketPartition() throws Exception {
- FileStoreTable originalTable =
- FileStoreTableFactory.create(LocalFileIO.create(),
getTableDirectory("T"));
- Map<String, String> dynamicOptions =
originalTable.coreOptions().toMap();
-
dynamicOptions.put(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key(),
"true");
- FileStoreTable table = originalTable.copy(dynamicOptions);
-
- insertDataToTable(table);
-
- // Two tasks will write records shuffled by bucket and partition
- assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 2);
- }
-
- private void insertDataToTable(FileStoreTable table) throws Exception {
- RowType rowType = toLogicalType(table.rowType());
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(2);
-
- List<RowData> sourceDataList = generateData();
- DataStreamSource<RowData> sourceStream =
- env.fromCollection(sourceDataList,
InternalTypeInfo.of(rowType));
- new FlinkSinkBuilder(table)
- .withInput(sourceStream)
- .withParallelism(env.getParallelism())
- .withSinkProvider(
- "testUser",
- (StoreSinkWrite.Provider)
- (table1, context, ioManager) ->
- (StoreSinkWrite) new
CollectStoreSinkWrite())
- .build();
- env.execute();
-
- assertEquals(
-
CollectStoreSinkWrite.writeRowsMap.values().stream().mapToInt(List::size).sum(),
- TOTAL_SOURCE_RECORD_COUNT);
- }
-
- private List<RowData> generateData() {
- List<RowData> rowDataList = new ArrayList<>(TOTAL_SOURCE_RECORD_COUNT);
- for (int i = 0; i < TOTAL_SOURCE_RECORD_COUNT; i++) {
- rowDataList.add(GenericRowData.of(i, i + 1, i + 2, i + 3));
- }
- return rowDataList;
- }
-
- /** Collect all received data with writer. */
- private static class CollectStoreSinkWrite implements StoreSinkWrite {
- private static final Map<StoreSinkWrite, List<InternalRow>>
writeRowsMap =
- new ConcurrentHashMap<>();
-
- @Override
- public SinkRecord write(InternalRow rowData) throws Exception {
- List<InternalRow> rows = writeRowsMap.computeIfAbsent(this, key ->
new ArrayList<>());
- rows.add(rowData);
- return null;
- }
-
- @Override
- public SinkRecord toLogRecord(SinkRecord record) {
- return record;
- }
-
- @Override
- public void compact(BinaryRow partition, int bucket, boolean
fullCompaction)
- throws Exception {}
-
- @Override
- public void notifyNewFiles(
- long snapshotId, BinaryRow partition, int bucket,
List<DataFileMeta> files) {}
-
- @Override
- public List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
- throws IOException {
- return Collections.emptyList();
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws
Exception {}
-
- @Override
- public void close() throws Exception {}
-
- @Override
- public void replace(Function<String, TableWriteImpl<?>>
newWriteProvider)
- throws Exception {}
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
new file mode 100644
index 000000000..710f8bece
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/RowDataChannelComputerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RowDataChannelComputer}. */
+public class RowDataChannelComputerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testSchemaWithPartition() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"pt", "k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ new HashMap<>(),
+ ""));
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numInputs = random.nextInt(1000) + 1;
+ List<RowData> input = new ArrayList<>();
+ for (int i = 0; i < numInputs; i++) {
+ input.add(
+ GenericRowData.of(
+ random.nextInt(10) + 1, random.nextLong(),
random.nextDouble()));
+ }
+
+ testImpl(schema, input);
+ }
+
+ @Test
+ public void testSchemaNoPartition() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ new HashMap<>(),
+ ""));
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numInputs = random.nextInt(1000) + 1;
+ List<RowData> input = new ArrayList<>();
+ for (int i = 0; i < numInputs; i++) {
+ input.add(GenericRowData.of(random.nextLong(),
random.nextDouble()));
+ }
+
+ testImpl(schema, input);
+ }
+
+ private void testImpl(TableSchema schema, List<RowData> input) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ RowDataKeyAndBucketExtractor extractor = new
RowDataKeyAndBucketExtractor(schema);
+
+ int numChannels = random.nextInt(10) + 1;
+ boolean hasLogSink = random.nextBoolean();
+ RowDataChannelComputer channelComputer = new
RowDataChannelComputer(schema, hasLogSink);
+ channelComputer.setup(numChannels);
+
+ // assert that channel(record) and channel(partition, bucket) gives
the same result
+
+ for (RowData rowData : input) {
+ extractor.setRecord(rowData);
+ BinaryRow partition = extractor.partition();
+ int bucket = extractor.bucket();
+
+ assertThat(channelComputer.channel(rowData))
+ .isEqualTo(channelComputer.channel(partition, bucket));
+ }
+
+ // assert that distribution should be even
+
+ int numTests = random.nextInt(10) + 1;
+ for (int test = 0; test < numTests; test++) {
+ Map<Integer, Integer> bucketsPerChannel = new HashMap<>();
+ for (int i = 0; i < numChannels; i++) {
+ bucketsPerChannel.put(i, 0);
+ }
+
+ extractor.setRecord(input.get(random.nextInt(input.size())));
+ BinaryRow partition = extractor.partition();
+
+ int numBuckets = random.nextInt(numChannels * 4) + 1;
+ for (int i = 0; i < numBuckets; i++) {
+ int channel = channelComputer.channel(partition, i);
+ bucketsPerChannel.compute(channel, (k, v) -> v + 1);
+ }
+
+ int max =
bucketsPerChannel.values().stream().max(Integer::compareTo).get();
+ int min =
bucketsPerChannel.values().stream().min(Integer::compareTo).get();
+ assertThat(max - min).isLessThanOrEqualTo(1);
+ }
+
+ // log sinks like Kafka only consider bucket and don't care about
partition
+ // so same bucket, even from different partition, must go to the same
channel
+
+ if (hasLogSink) {
+ Map<Integer, Set<Integer>> channelsPerBucket = new HashMap<>();
+ for (RowData rowData : input) {
+ extractor.setRecord(rowData);
+ int bucket = extractor.bucket();
+ channelsPerBucket
+ .computeIfAbsent(bucket, k -> new HashSet<>())
+ .add(channelComputer.channel(rowData));
+ }
+ for (Set<Integer> channels : channelsPerBucket.values()) {
+ assertThat(channels).hasSize(1);
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index c9135ec2c..4748fc1d9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -43,16 +43,15 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link FileStoreSink} when writing file store and with
savepoints. */
public class SinkSavepointITCase extends AbstractTestBase {
@@ -65,15 +64,21 @@ public class SinkSavepointITCase extends AbstractTestBase {
path = getTempDirPath();
// for failure tests
failingName = UUID.randomUUID().toString();
- FailingFileIO.reset(failingName, 100, 500);
}
@Test
- @Timeout(180000)
+ @Timeout(180)
public void testRecoverFromSavepoint() throws Exception {
String failingPath = FailingFileIO.getFailingPath(failingName, path);
String savepointPath = null;
+
ThreadLocalRandom random = ThreadLocalRandom.current();
+ boolean enableFailure = random.nextBoolean();
+ if (enableFailure) {
+ FailingFileIO.reset(failingName, 100, 500);
+ } else {
+ FailingFileIO.reset(failingName, 0, 1);
+ }
OUTER:
while (true) {
@@ -117,7 +122,8 @@ public class SinkSavepointITCase extends AbstractTestBase {
// recover from savepoint in the next round
}
- checkRecoverFromSavepointResult(failingPath);
+ checkRecoverFromSavepointBatchResult();
+ checkRecoverFromSavepointStreamingResult();
}
private JobClient runRecoverFromSavepointJob(String failingPath, String
savepointPath)
@@ -139,20 +145,25 @@ public class SinkSavepointITCase extends AbstractTestBase
{
tEnv.getConfig()
.getConfiguration()
.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" +
path + "/checkpoint");
+ // input data must be strictly ordered for us to check changelog
results
tEnv.getConfig()
.getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
- // we're creating multiple table environments in the same process
- // if we do not set this option, stream node id will be different even
with the same SQL
- // if stream node id is different then we can't recover from savepoint
- tEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ String createCatalogSql =
+ String.join(
+ "\n",
+ "CREATE CATALOG my_catalog WITH (",
+ " 'type' = 'paimon',",
+ " 'warehouse' = '" + failingPath + "'",
+ ")");
+ FailingFileIO.retryArtificialException(() ->
tEnv.executeSql(createCatalogSql));
+ tEnv.executeSql("USE CATALOG my_catalog");
tEnv.executeSql(
String.join(
"\n",
- "CREATE TABLE S (",
+ "CREATE TEMPORARY TABLE S (",
" a INT",
") WITH (",
" 'connector' = 'datagen',",
@@ -162,29 +173,26 @@ public class SinkSavepointITCase extends AbstractTestBase
{
" 'fields.a.end' = '99999'",
")"));
- String createCatalogSql =
- String.join(
- "\n",
- "CREATE CATALOG my_catalog WITH (",
- " 'type' = 'paimon',",
- " 'warehouse' = '" + failingPath + "'",
- ")");
- FailingFileIO.retryArtificialException(() ->
tEnv.executeSql(createCatalogSql));
-
- tEnv.executeSql("USE CATALOG my_catalog");
-
String createSinkSql =
String.join(
"\n",
"CREATE TABLE IF NOT EXISTS T (",
- " a INT",
+ " k INT,",
+ " v INT,",
+ " PRIMARY KEY (k) NOT ENFORCED",
") WITH (",
- " 'bucket' = '2',",
- " 'file.format' = 'avro'",
+ " 'bucket' = '4',",
+ " 'file.format' = 'avro',",
+ " 'changelog-producer' = 'full-compaction',",
+ " 'full-compaction.delta-commits' = '3'",
")");
FailingFileIO.retryArtificialException(() ->
tEnv.executeSql(createSinkSql));
- String insertIntoSql = "INSERT INTO T SELECT * FROM
default_catalog.default_database.S";
+ // test changing sink parallelism by using a random parallelism
+ String insertIntoSql =
+ String.format(
+ "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '%d')
*/ SELECT (a %% 15000) AS k, a AS v FROM S",
+ ThreadLocalRandom.current().nextInt(3) + 2);
JobClient jobClient =
FailingFileIO.retryArtificialException(() ->
tEnv.executeSql(insertIntoSql))
.getJobClient()
@@ -196,32 +204,89 @@ public class SinkSavepointITCase extends AbstractTestBase
{
return jobClient;
}
- private void checkRecoverFromSavepointResult(String failingPath) throws
Exception {
+ private void checkRecoverFromSavepointBatchResult() throws Exception {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
- // no failure should occur when checking for answer
- FailingFileIO.reset(failingName, 0, 1);
- String createCatalogSql =
+ tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG my_catalog WITH (",
" 'type' = 'paimon',",
- " 'warehouse' = '" + failingPath + "'",
- ")");
- tEnv.executeSql(createCatalogSql);
-
+ " 'warehouse' = '" + path + "'",
+ ")"));
tEnv.executeSql("USE CATALOG my_catalog");
- List<Integer> actual = new ArrayList<>();
+ Map<Integer, Integer> expected = new HashMap<>();
+ for (int i = 0; i < 100000; i++) {
+ expected.put(i % 15000, i);
+ }
+
+ Map<Integer, Integer> actual = new HashMap<>();
try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM
T").collect()) {
while (it.hasNext()) {
Row row = it.next();
- assertEquals(1, row.getArity());
- actual.add((Integer) row.getField(0));
+ assertThat(row.getArity()).isEqualTo(2);
+ actual.put((Integer) row.getField(0), (Integer)
row.getField(1));
+ }
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private void checkRecoverFromSavepointStreamingResult() throws Exception {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+ TableEnvironment tEnv = TableEnvironment.create(settings);
+
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG my_catalog WITH (",
+ " 'type' = 'paimon',",
+ " 'warehouse' = '" + path + "'",
+ ")"));
+ tEnv.executeSql("USE CATALOG my_catalog");
+
+ Map<Integer, Integer> expected = new HashMap<>();
+ for (int i = 0; i < 100000; i++) {
+ expected.put(i % 15000, i);
+ }
+ Set<Integer> expectedValues = new HashSet<>(expected.values());
+
+ int endCount = 0;
+ Map<Integer, Integer> actual = new HashMap<>();
+ try (CloseableIterator<Row> it =
+ tEnv.executeSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id' = '2') */")
+ .collect()) {
+ while (it.hasNext()) {
+ Row row = it.next();
+ assertThat(row.getArity()).isEqualTo(2);
+
+ int k = (Integer) row.getField(0);
+ int v = (Integer) row.getField(1);
+ switch (row.getKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ assertThat(actual).doesNotContainKey(k);
+ actual.put(k, v);
+ break;
+ case DELETE:
+ case UPDATE_BEFORE:
+ assertThat(actual).containsKey(k);
+ actual.remove(k);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown row kind " + row.getKind());
+ }
+
+ if (expectedValues.contains(v)) {
+ endCount++;
+ }
+ if (endCount >= expectedValues.size()) {
+ break;
+ }
}
}
- Collections.sort(actual);
- assertEquals(IntStream.range(0,
100000).boxed().collect(Collectors.toList()), actual);
+ assertThat(actual).isEqualTo(expected);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
new file mode 100644
index 000000000..ccfc4adad
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CdcRecordChannelComputer}. */
+public class CdcRecordChannelComputerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testSchemaWithPartition() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"pt", "k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ new HashMap<>(),
+ ""));
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numInputs = random.nextInt(1000) + 1;
+ List<Map<String, String>> input = new ArrayList<>();
+ for (int i = 0; i < numInputs; i++) {
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pt", String.valueOf(random.nextInt(10) + 1));
+ fields.put("k", String.valueOf(random.nextLong()));
+ fields.put("v", String.valueOf(random.nextDouble()));
+ input.add(fields);
+ }
+
+ testImpl(schema, input);
+ }
+
+ @Test
+ public void testSchemaNoPartition() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ new HashMap<>(),
+ ""));
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numInputs = random.nextInt(1000) + 1;
+ List<Map<String, String>> input = new ArrayList<>();
+ for (int i = 0; i < numInputs; i++) {
+ Map<String, String> fields = new HashMap<>();
+ fields.put("k", String.valueOf(random.nextLong()));
+ fields.put("v", String.valueOf(random.nextDouble()));
+ input.add(fields);
+ }
+
+ testImpl(schema, input);
+ }
+
+ private void testImpl(TableSchema schema, List<Map<String, String>> input)
{
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ CdcRecordKeyAndBucketExtractor extractor = new
CdcRecordKeyAndBucketExtractor(schema);
+
+ int numChannels = random.nextInt(10) + 1;
+ CdcRecordChannelComputer channelComputer = new
CdcRecordChannelComputer(schema);
+ channelComputer.setup(numChannels);
+
+ // assert that channel(record) and channel(partition, bucket) gives
the same result
+
+ for (Map<String, String> fields : input) {
+ CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
+ CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
+
+ extractor.setRecord(random.nextBoolean() ? insertRecord :
deleteRecord);
+ BinaryRow partition = extractor.partition();
+ int bucket = extractor.bucket();
+
+ assertThat(channelComputer.channel(insertRecord))
+ .isEqualTo(channelComputer.channel(partition, bucket));
+ assertThat(channelComputer.channel(deleteRecord))
+ .isEqualTo(channelComputer.channel(partition, bucket));
+ }
+
+ // assert that distribution should be even
+
+ int numTests = random.nextInt(10) + 1;
+ for (int test = 0; test < numTests; test++) {
+ Map<Integer, Integer> bucketsPerChannel = new HashMap<>();
+ for (int i = 0; i < numChannels; i++) {
+ bucketsPerChannel.put(i, 0);
+ }
+
+ Map<String, String> fields =
input.get(random.nextInt(input.size()));
+ extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ BinaryRow partition = extractor.partition();
+
+ int numBuckets = random.nextInt(numChannels * 4) + 1;
+ for (int i = 0; i < numBuckets; i++) {
+ int channel = channelComputer.channel(partition, i);
+ bucketsPerChannel.compute(channel, (k, v) -> v + 1);
+ }
+
+ int max =
bucketsPerChannel.values().stream().max(Integer::compareTo).get();
+ int min =
bucketsPerChannel.values().stream().min(Integer::compareTo).get();
+ assertThat(max - min).isLessThanOrEqualTo(1);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 38ec56338..5ed9f70a4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -62,8 +62,8 @@ import java.util.function.Predicate;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link SchemaAwareStoreWriteOperator}. */
-public class SchemaAwareStoreWriteOperatorTest {
+/** Tests for {@link CdcRecordStoreWriteOperator}. */
+public class CdcRecordStoreWriteOperatorTest {
@TempDir java.nio.file.Path tempDir;
@@ -252,13 +252,13 @@ public class SchemaAwareStoreWriteOperatorTest {
private OneInputStreamOperatorTestHarness<CdcRecord, Committable>
createTestHarness(
FileStoreTable table) throws Exception {
- SchemaAwareStoreWriteOperator operator =
- new SchemaAwareStoreWriteOperator(
+ CdcRecordStoreWriteOperator operator =
+ new CdcRecordStoreWriteOperator(
table,
- null,
- (t, context, ioManager) ->
+ (t, commitUser, state, ioManager) ->
new StoreSinkWriteImpl(
- t, context, commitUser, ioManager,
false, false));
+ t, commitUser, state, ioManager,
false, false),
+ commitUser);
TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
TypeSerializer<Committable> outputSerializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
@@ -271,7 +271,7 @@ public class SchemaAwareStoreWriteOperatorTest {
private FileStoreTable createFileStoreTable(
RowType rowType, List<String> partitions, List<String>
primaryKeys) throws Exception {
Options conf = new Options();
- conf.set(SchemaAwareStoreWriteOperator.RETRY_SLEEP_TIME,
Duration.ofMillis(10));
+ conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME,
Duration.ofMillis(10));
TableSchema tableSchema =
SchemaUtils.forceCommit(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index bc600a86f..eaac7448b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -226,7 +225,6 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
conf.set(CoreOptions.BUCKET, numBucket);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
- conf.set(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION,
shuffleByPartitionEnable);
TableSchema tableSchema =
SchemaUtils.forceCommit(