This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eef86147ee [flink] Writer coordinator for compaction (#8128)
eef86147ee is described below
commit eef86147eeae1cb8df09af7f362ecb1190f99f3e
Author: Mao <[email protected]>
AuthorDate: Sat Jun 6 18:49:05 2026 +1000
[flink] Writer coordinator for compaction (#8128)
---
.../apache/paimon/flink/sink/CompactorSink.java | 11 +++-
.../paimon/flink/sink/StoreCompactOperator.java | 76 ++++++++++++++++++++++
.../paimon/flink/sink/CompactorSinkITCase.java | 16 ++++-
.../flink/sink/StoreCompactOperatorTest.java | 71 +++++++++++++++++++-
4 files changed, 169 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index a9c6031dfa..cfaabe9564 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -19,11 +19,14 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.data.RowData;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED;
+
/** {@link FlinkSink} for dedicated compact jobs. */
public class CompactorSink extends FlinkSink<RowData> {
@@ -39,7 +42,13 @@ public class CompactorSink extends FlinkSink<RowData> {
@Override
protected OneInputStreamOperatorFactory<RowData, Committable>
createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
- return new StoreCompactOperator.Factory(table, writeProvider,
commitUser, fullCompaction);
+ Options options = table.coreOptions().toConfiguration();
+ boolean coordinatorEnabled =
options.get(SINK_WRITER_COORDINATOR_ENABLED);
+ return coordinatorEnabled
+ ? new StoreCompactOperator.CoordinatedFactory(
+ table, writeProvider, commitUser, fullCompaction)
+ : new StoreCompactOperator.Factory(
+ table, writeProvider, commitUser, fullCompaction);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 0b7984d2f4..d1cbfd3c64 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -21,17 +21,24 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore;
+import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -69,6 +76,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
private transient StoreSinkWrite write;
private transient DataFileMetaSerializer dataFileMetaSerializer;
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
+ protected transient @Nullable WriteRestore writeRestore;
protected transient @Nullable CompactRefresher compactRefresher;
@@ -119,10 +127,17 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
getContainingTask().getEnvironment().getIOManager(),
memoryPoolFactory,
getMetricGroup());
+ if (writeRestore != null) {
+ write.setWriteRestore(writeRestore);
+ }
this.compactRefresher =
CompactRefresher.create(write.streamingMode(), table,
write::replace);
}
+ public void setWriteRestore(@Nullable WriteRestore writeRestore) {
+ this.writeRestore = writeRestore;
+ }
+
@Override
public void open() throws Exception {
super.open();
@@ -243,4 +258,65 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
return StoreCompactOperator.class;
}
}
+
+ /** {@link StreamOperatorFactory} of {@link StoreCompactOperator} with
write coordinator. */
+ public static class CoordinatedFactory
+ extends PrepareCommitOperator.Factory<RowData, Committable>
+ implements CoordinatedOperatorFactory<Committable> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+ private final StoreSinkWrite.Provider storeSinkWriteProvider;
+ private final String initialCommitUser;
+ private final boolean fullCompaction;
+
+ public CoordinatedFactory(
+ FileStoreTable table,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser,
+ boolean fullCompaction) {
+ super(Options.fromMap(table.options()));
+ Preconditions.checkArgument(
+ !table.coreOptions().writeOnly(),
+ CoreOptions.WRITE_ONLY.key() + " should not be true for
StoreCompactOperator.");
+ this.table = table;
+ this.storeSinkWriteProvider = storeSinkWriteProvider;
+ this.initialCommitUser = initialCommitUser;
+ this.fullCompaction = fullCompaction;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new WriteOperatorCoordinator.Provider(operatorID, table);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<Committable>> T createStreamOperator(
+ StreamOperatorParameters<Committable> parameters) {
+ OperatorID operatorID =
parameters.getStreamConfig().getOperatorID();
+ TaskOperatorEventGateway gateway =
+ parameters
+ .getContainingTask()
+ .getEnvironment()
+ .getOperatorCoordinatorEventGateway();
+ StoreCompactOperator operator =
+ new StoreCompactOperator(
+ parameters,
+ table,
+ storeSinkWriteProvider,
+ initialCommitUser,
+ fullCompaction);
+ operator.setWriteRestore(new CoordinatedWriteRestore(gateway,
operatorID));
+ return (T) operator;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return StoreCompactOperator.class;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 7b058328cb..2152a518e8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -49,6 +49,8 @@ import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Arrays;
@@ -80,8 +82,9 @@ public class CompactorSinkITCase extends AbstractTestBase {
commitUser = UUID.randomUUID().toString();
}
- @Test
- public void testCompact() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testCompact(boolean writeCoordinatorEnabled) throws Exception {
FileStoreTable table = createFileStoreTable();
SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
@@ -119,7 +122,14 @@ public class CompactorSinkITCase extends AbstractTestBase {
getSpecifiedPartitions(),
table.coreOptions().partitionDefaultName()))
.build();
- new CompactorSinkBuilder(table, true).withInput(source).build();
+ FileStoreTable sinkTable =
+ writeCoordinatorEnabled
+ ? table.copy(
+ Collections.singletonMap(
+
FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED.key(),
+ "true"))
+ : table;
+ new CompactorSinkBuilder(sinkTable, true).withInput(source).build();
env.execute();
snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index a89cfa66f0..5e2cb10e0e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore;
+import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.WriteRestore;
@@ -44,6 +46,7 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -106,6 +109,69 @@ public class StoreCompactOperatorTest extends
TableTestBase {
assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
}
+ @Test
+ public void testCoordinatorProvider() throws Exception {
+ createTableDefault();
+
+ StoreCompactOperator.CoordinatedFactory operatorFactory =
+ new StoreCompactOperator.CoordinatedFactory(
+ getTableDefault(),
+ (table, commitUser, state, ioManager,
memoryPoolFactory, metricGroup) ->
+ new CompactRememberStoreWrite(true),
+ "10086",
+ false);
+
+ assertThat(operatorFactory.getCoordinatorProvider("compact", new
OperatorID()))
+ .isInstanceOf(WriteOperatorCoordinator.Provider.class);
+
assertThat(operatorFactory.getStreamOperatorClass(getClass().getClassLoader()))
+ .isEqualTo(StoreCompactOperator.class);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCompactWithCoordinator(boolean streamingMode) throws
Exception {
+ createTableDefault();
+
+ CompactRememberStoreWrite compactRememberStoreWrite =
+ new CompactRememberStoreWrite(streamingMode);
+ StoreCompactOperator.CoordinatedFactory operatorFactory =
+ new StoreCompactOperator.CoordinatedFactory(
+ getTableDefault(),
+ (table, commitUser, state, ioManager,
memoryPoolFactory, metricGroup) ->
+ compactRememberStoreWrite,
+ "10086",
+ !streamingMode);
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(operatorFactory);
+ harness.setup(serializer);
+ harness.initializeEmptyState();
+ harness.open();
+
+ // the coordinated factory must wire a CoordinatedWriteRestore into
the write
+ assertThat(compactRememberStoreWrite.capturedWriteRestore)
+ .isInstanceOf(CoordinatedWriteRestore.class);
+
+ harness.processElement(new StreamRecord<>(data(0)));
+ harness.processElement(new StreamRecord<>(data(0)));
+ harness.processElement(new StreamRecord<>(data(1)));
+ harness.processElement(new StreamRecord<>(data(1)));
+ harness.processElement(new StreamRecord<>(data(2)));
+
+ StoreCompactOperator operator = (StoreCompactOperator)
harness.getOperator();
+ assertThat(operator.compactionWaitingSet())
+ .containsExactlyInAnyOrder(
+ Pair.of(BinaryRow.EMPTY_ROW, 0),
+ Pair.of(BinaryRow.EMPTY_ROW, 1),
+ Pair.of(BinaryRow.EMPTY_ROW, 2));
+ assertThat(compactRememberStoreWrite.compactTime).isEqualTo(0);
+ operator.prepareCommit(true, 1);
+ assertThat(operator.compactionWaitingSet()).isEmpty();
+ assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
+ }
+
@Test
public void testStreamingCompactConflictWithOverwrite() throws Exception {
Schema schema =
@@ -239,13 +305,16 @@ public class StoreCompactOperatorTest extends
TableTestBase {
private final boolean streamingMode;
private int compactTime = 0;
+ private @Nullable WriteRestore capturedWriteRestore;
public CompactRememberStoreWrite(boolean streamingMode) {
this.streamingMode = streamingMode;
}
@Override
- public void setWriteRestore(WriteRestore writeRestore) {}
+ public void setWriteRestore(WriteRestore writeRestore) {
+ this.capturedWriteRestore = writeRestore;
+ }
@Override
public SinkRecord write(InternalRow rowData) {