This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eef86147ee [flink] Writer coordinator for compaction (#8128)
eef86147ee is described below

commit eef86147eeae1cb8df09af7f362ecb1190f99f3e
Author: Mao <[email protected]>
AuthorDate: Sat Jun 6 18:49:05 2026 +1000

    [flink] Writer coordinator for compaction (#8128)
---
 .../apache/paimon/flink/sink/CompactorSink.java    | 11 +++-
 .../paimon/flink/sink/StoreCompactOperator.java    | 76 ++++++++++++++++++++++
 .../paimon/flink/sink/CompactorSinkITCase.java     | 16 ++++-
 .../flink/sink/StoreCompactOperatorTest.java       | 71 +++++++++++++++++++-
 4 files changed, 169 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index a9c6031dfa..cfaabe9564 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -19,11 +19,14 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.table.data.RowData;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED;
+
 /** {@link FlinkSink} for dedicated compact jobs. */
 public class CompactorSink extends FlinkSink<RowData> {
 
@@ -39,7 +42,13 @@ public class CompactorSink extends FlinkSink<RowData> {
     @Override
     protected OneInputStreamOperatorFactory<RowData, Committable> 
createWriteOperatorFactory(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
-        return new StoreCompactOperator.Factory(table, writeProvider, 
commitUser, fullCompaction);
+        Options options = table.coreOptions().toConfiguration();
+        boolean coordinatorEnabled = 
options.get(SINK_WRITER_COORDINATOR_ENABLED);
+        return coordinatorEnabled
+                ? new StoreCompactOperator.CoordinatedFactory(
+                        table, writeProvider, commitUser, fullCompaction)
+                : new StoreCompactOperator.Factory(
+                        table, writeProvider, commitUser, fullCompaction);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 0b7984d2f4..d1cbfd3c64 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -21,17 +21,24 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore;
+import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.operation.WriteRestore;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -69,6 +76,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     private transient StoreSinkWrite write;
     private transient DataFileMetaSerializer dataFileMetaSerializer;
     private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
+    protected transient @Nullable WriteRestore writeRestore;
 
     protected transient @Nullable CompactRefresher compactRefresher;
 
@@ -119,10 +127,17 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                         getContainingTask().getEnvironment().getIOManager(),
                         memoryPoolFactory,
                         getMetricGroup());
+        if (writeRestore != null) {
+            write.setWriteRestore(writeRestore);
+        }
         this.compactRefresher =
                 CompactRefresher.create(write.streamingMode(), table, 
write::replace);
     }
 
+    public void setWriteRestore(@Nullable WriteRestore writeRestore) {
+        this.writeRestore = writeRestore;
+    }
+
     @Override
     public void open() throws Exception {
         super.open();
@@ -243,4 +258,65 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
             return StoreCompactOperator.class;
         }
     }
+
+    /** {@link StreamOperatorFactory} of {@link StoreCompactOperator} with 
write coordinator. */
+    public static class CoordinatedFactory
+            extends PrepareCommitOperator.Factory<RowData, Committable>
+            implements CoordinatedOperatorFactory<Committable> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final FileStoreTable table;
+        private final StoreSinkWrite.Provider storeSinkWriteProvider;
+        private final String initialCommitUser;
+        private final boolean fullCompaction;
+
+        public CoordinatedFactory(
+                FileStoreTable table,
+                StoreSinkWrite.Provider storeSinkWriteProvider,
+                String initialCommitUser,
+                boolean fullCompaction) {
+            super(Options.fromMap(table.options()));
+            Preconditions.checkArgument(
+                    !table.coreOptions().writeOnly(),
+                    CoreOptions.WRITE_ONLY.key() + " should not be true for 
StoreCompactOperator.");
+            this.table = table;
+            this.storeSinkWriteProvider = storeSinkWriteProvider;
+            this.initialCommitUser = initialCommitUser;
+            this.fullCompaction = fullCompaction;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new WriteOperatorCoordinator.Provider(operatorID, table);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T extends StreamOperator<Committable>> T createStreamOperator(
+                StreamOperatorParameters<Committable> parameters) {
+            OperatorID operatorID = 
parameters.getStreamConfig().getOperatorID();
+            TaskOperatorEventGateway gateway =
+                    parameters
+                            .getContainingTask()
+                            .getEnvironment()
+                            .getOperatorCoordinatorEventGateway();
+            StoreCompactOperator operator =
+                    new StoreCompactOperator(
+                            parameters,
+                            table,
+                            storeSinkWriteProvider,
+                            initialCommitUser,
+                            fullCompaction);
+            operator.setWriteRestore(new CoordinatedWriteRestore(gateway, 
operatorID));
+            return (T) operator;
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return StoreCompactOperator.class;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 7b058328cb..2152a518e8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -49,6 +49,8 @@ import org.apache.flink.table.data.RowData;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -80,8 +82,9 @@ public class CompactorSinkITCase extends AbstractTestBase {
         commitUser = UUID.randomUUID().toString();
     }
 
-    @Test
-    public void testCompact() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testCompact(boolean writeCoordinatorEnabled) throws Exception {
         FileStoreTable table = createFileStoreTable();
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamWriteBuilder streamWriteBuilder =
@@ -119,7 +122,14 @@ public class CompactorSinkITCase extends AbstractTestBase {
                                         getSpecifiedPartitions(),
                                         
table.coreOptions().partitionDefaultName()))
                         .build();
-        new CompactorSinkBuilder(table, true).withInput(source).build();
+        FileStoreTable sinkTable =
+                writeCoordinatorEnabled
+                        ? table.copy(
+                                Collections.singletonMap(
+                                        
FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED.key(),
+                                        "true"))
+                        : table;
+        new CompactorSinkBuilder(sinkTable, true).withInput(source).build();
         env.execute();
 
         snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index a89cfa66f0..5e2cb10e0e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore;
+import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.operation.WriteRestore;
@@ -44,6 +46,7 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -106,6 +109,69 @@ public class StoreCompactOperatorTest extends 
TableTestBase {
         assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
     }
 
+    @Test
+    public void testCoordinatorProvider() throws Exception {
+        createTableDefault();
+
+        StoreCompactOperator.CoordinatedFactory operatorFactory =
+                new StoreCompactOperator.CoordinatedFactory(
+                        getTableDefault(),
+                        (table, commitUser, state, ioManager, 
memoryPoolFactory, metricGroup) ->
+                                new CompactRememberStoreWrite(true),
+                        "10086",
+                        false);
+
+        assertThat(operatorFactory.getCoordinatorProvider("compact", new 
OperatorID()))
+                .isInstanceOf(WriteOperatorCoordinator.Provider.class);
+        
assertThat(operatorFactory.getStreamOperatorClass(getClass().getClassLoader()))
+                .isEqualTo(StoreCompactOperator.class);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testCompactWithCoordinator(boolean streamingMode) throws 
Exception {
+        createTableDefault();
+
+        CompactRememberStoreWrite compactRememberStoreWrite =
+                new CompactRememberStoreWrite(streamingMode);
+        StoreCompactOperator.CoordinatedFactory operatorFactory =
+                new StoreCompactOperator.CoordinatedFactory(
+                        getTableDefault(),
+                        (table, commitUser, state, ioManager, 
memoryPoolFactory, metricGroup) ->
+                                compactRememberStoreWrite,
+                        "10086",
+                        !streamingMode);
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operatorFactory);
+        harness.setup(serializer);
+        harness.initializeEmptyState();
+        harness.open();
+
+        // the coordinated factory must wire a CoordinatedWriteRestore into 
the write
+        assertThat(compactRememberStoreWrite.capturedWriteRestore)
+                .isInstanceOf(CoordinatedWriteRestore.class);
+
+        harness.processElement(new StreamRecord<>(data(0)));
+        harness.processElement(new StreamRecord<>(data(0)));
+        harness.processElement(new StreamRecord<>(data(1)));
+        harness.processElement(new StreamRecord<>(data(1)));
+        harness.processElement(new StreamRecord<>(data(2)));
+
+        StoreCompactOperator operator = (StoreCompactOperator) 
harness.getOperator();
+        assertThat(operator.compactionWaitingSet())
+                .containsExactlyInAnyOrder(
+                        Pair.of(BinaryRow.EMPTY_ROW, 0),
+                        Pair.of(BinaryRow.EMPTY_ROW, 1),
+                        Pair.of(BinaryRow.EMPTY_ROW, 2));
+        assertThat(compactRememberStoreWrite.compactTime).isEqualTo(0);
+        operator.prepareCommit(true, 1);
+        assertThat(operator.compactionWaitingSet()).isEmpty();
+        assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
+    }
+
     @Test
     public void testStreamingCompactConflictWithOverwrite() throws Exception {
         Schema schema =
@@ -239,13 +305,16 @@ public class StoreCompactOperatorTest extends 
TableTestBase {
 
         private final boolean streamingMode;
         private int compactTime = 0;
+        private @Nullable WriteRestore capturedWriteRestore;
 
         public CompactRememberStoreWrite(boolean streamingMode) {
             this.streamingMode = streamingMode;
         }
 
         @Override
-        public void setWriteRestore(WriteRestore writeRestore) {}
+        public void setWriteRestore(WriteRestore writeRestore) {
+            this.capturedWriteRestore = writeRestore;
+        }
 
         @Override
         public SinkRecord write(InternalRow rowData) {

Reply via email to