This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4152543a6 [flink] Refactor FlinkSink to clarify the relationships 
between different bucket-mode tables and sinks (#2636)
4152543a6 is described below

commit 4152543a647fb97b77d054f6cb8fb2fd03da0245
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Thu Jan 4 17:17:25 2024 +0800

    [flink] Refactor FlinkSink to clarify the relationships between different 
bucket-mode tables and sinks (#2636)
---
 .../{FlinkCdcSink.java => CdcFixedBucketSink.java} |  7 +++--
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |  2 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  7 +++--
 .../{FileStoreSink.java => FixedBucketSink.java}   |  6 ++--
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  7 ++---
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  6 ++--
 ...ileStoreSink.java => RowUnawareBucketSink.java} | 25 +++++++---------
 ...BucketWriteSink.java => UnawareBucketSink.java} | 35 +++++++++++-----------
 .../org/apache/paimon/flink/FileStoreITCase.java   |  4 +--
 .../apache/paimon/flink/sink/FlinkSinkTest.java    |  6 ++--
 .../paimon/flink/sink/SinkSavepointITCase.java     |  2 +-
 11 files changed, 52 insertions(+), 55 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java
similarity index 86%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java
index 870bf5361..59bdb192b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java
@@ -27,13 +27,14 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 
 /**
- * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema 
change if necessary.
+ * A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} 
and waits for a schema
+ * change if necessary.
  */
-public class FlinkCdcSink extends FlinkWriteSink<CdcRecord> {
+public class CdcFixedBucketSink extends FlinkWriteSink<CdcRecord> {
 
     private static final long serialVersionUID = 1L;
 
-    public FlinkCdcSink(FileStoreTable table) {
+    public CdcFixedBucketSink(FileStoreTable table) {
         super(table, null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 1bda75695..6c17ee6bd 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -128,6 +128,6 @@ public class CdcSinkBuilder<T> {
         FileStoreTable dataTable = (FileStoreTable) table;
         DataStream<CdcRecord> partitioned =
                 partition(parsed, new 
CdcRecordChannelComputer(dataTable.schema()), parallelism);
-        return new FlinkCdcSink(dataTable).sinkFrom(partitioned);
+        return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index d4c630777..b09d09ad9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.sink.FlinkWriteSink;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -43,8 +44,8 @@ import static 
org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 
 /**
- * Builder for {@link FlinkCdcSink} when syncing the whole database into one 
Paimon database. Each
- * database table will be written into a separate Paimon table.
+ * Builder for CDC {@link FlinkWriteSink} when syncing the whole database into 
one Paimon database.
+ * Each database table will be written into a separate Paimon table.
  *
  * <p>This builder will create a separate sink for each Paimon sink table. 
Thus this implementation
  * is not very efficient in resource saving.
@@ -167,7 +168,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     private void buildForFixedBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
         DataStream<CdcRecord> partitioned =
                 partition(parsed, new 
CdcRecordChannelComputer(table.schema()), parallelism);
-        new FlinkCdcSink(table).sinkFrom(partitioned);
+        new CdcFixedBucketSink(table).sinkFrom(partitioned);
     }
 
     private void buildDividedCdcSink() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java
similarity index 90%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java
index 5481ae1f5..613bf369b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java
@@ -27,14 +27,14 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-/** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkWriteSink<InternalRow> {
+/** {@link FlinkSink} for writing records into fixed bucket Paimon table. */
+public class FixedBucketSink extends FlinkWriteSink<InternalRow> {
 
     private static final long serialVersionUID = 1L;
 
     @Nullable private final LogSinkFunction logSinkFunction;
 
-    public FileStoreSink(
+    public FixedBucketSink(
             FileStoreTable table,
             @Nullable Map<String, String> overwritePartition,
             @Nullable LogSinkFunction logSinkFunction) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f72f66885..766edc499 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -149,8 +149,7 @@ public abstract class FlinkSink<T> implements Serializable {
         assertNoSinkMaterializer(input);
 
         // do the actually writing action, no snapshot generated in this stage
-        SingleOutputStreamOperator<Committable> written =
-                doWrite(input, initialCommitUser, input.getParallelism());
+        DataStream<Committable> written = doWrite(input, initialCommitUser, 
input.getParallelism());
 
         // commit the committable to generate a new snapshot
         return doCommit(written, initialCommitUser);
@@ -180,8 +179,8 @@ public abstract class FlinkSink<T> implements Serializable {
         }
     }
 
-    public SingleOutputStreamOperator<Committable> doWrite(
-            DataStream<T> input, String commitUser, Integer parallelism) {
+    public DataStream<Committable> doWrite(
+            DataStream<T> input, String commitUser, @Nullable Integer 
parallelism) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         boolean isStreaming =
                 StreamExecutionEnvironmentUtils.getConfiguration(env)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 46a0c09fa..17926093c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -35,7 +35,7 @@ import java.util.Map;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Builder for {@link FileStoreSink}. */
+/** Builder for {@link FlinkSink}. */
 public class FlinkSinkBuilder {
 
     private final FileStoreTable table;
@@ -136,7 +136,7 @@ public class FlinkSinkBuilder {
                         input,
                         new RowDataChannelComputer(table.schema(), 
logSinkFunction != null),
                         parallelism);
-        FileStoreSink sink = new FileStoreSink(table, overwritePartition, 
logSinkFunction);
+        FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, 
logSinkFunction);
         return sink.sinkFrom(partitioned);
     }
 
@@ -144,7 +144,7 @@ public class FlinkSinkBuilder {
         checkArgument(
                 table instanceof AppendOnlyFileStoreTable,
                 "Unaware bucket mode only works with append-only table for 
now.");
-        return new UnawareBucketWriteSink(
+        return new RowUnawareBucketSink(
                         (AppendOnlyFileStoreTable) table,
                         overwritePartition,
                         logSinkFunction,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
similarity index 68%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
index 5481ae1f5..d838a9aa4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
@@ -19,27 +19,22 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 
-import javax.annotation.Nullable;
-
 import java.util.Map;
 
-/** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkWriteSink<InternalRow> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Nullable private final LogSinkFunction logSinkFunction;
+/** An {@link UnawareBucketSink} which handles {@link InternalRow}. */
+public class RowUnawareBucketSink extends UnawareBucketSink<InternalRow> {
 
-    public FileStoreSink(
-            FileStoreTable table,
-            @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkFunction logSinkFunction) {
-        super(table, overwritePartition);
-        this.logSinkFunction = logSinkFunction;
+    public RowUnawareBucketSink(
+            AppendOnlyFileStoreTable table,
+            Map<String, String> overwritePartitions,
+            LogSinkFunction logSinkFunction,
+            Integer parallelism,
+            boolean boundedInput) {
+        super(table, overwritePartitions, logSinkFunction, parallelism, 
boundedInput);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
similarity index 71%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index 802a98dcf..cb2481063 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -18,48 +18,50 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
 
 import java.util.Map;
 
 /**
- * Build topology for unaware bucket table sink.
+ * Sink for unaware-bucket table.
  *
  * <p>Note: in unaware-bucket mode, we don't shuffle by bucket in inserting. 
We can assign
  * compaction to the inserting jobs aside.
  */
-public class UnawareBucketWriteSink extends FileStoreSink {
+public abstract class UnawareBucketSink<T> extends FlinkWriteSink<T> {
+
+    protected final AppendOnlyFileStoreTable table;
+    protected final LogSinkFunction logSinkFunction;
 
-    private final boolean enableCompaction;
-    private final AppendOnlyFileStoreTable table;
-    private final Integer parallelism;
+    @Nullable private final Integer parallelism;
     private final boolean boundedInput;
 
-    public UnawareBucketWriteSink(
+    public UnawareBucketSink(
             AppendOnlyFileStoreTable table,
-            Map<String, String> overwritePartitions,
+            @Nullable Map<String, String> overwritePartitions,
             LogSinkFunction logSinkFunction,
-            Integer parallelism,
+            @Nullable Integer parallelism,
             boolean boundedInput) {
-        super(table, overwritePartitions, logSinkFunction);
+        super(table, overwritePartitions);
         this.table = table;
-        this.enableCompaction = !table.coreOptions().writeOnly();
+        this.logSinkFunction = logSinkFunction;
         this.parallelism = parallelism;
         this.boundedInput = boundedInput;
     }
 
     @Override
-    public DataStreamSink<?> sinkFrom(DataStream<InternalRow> input, String 
initialCommitUser) {
-        // do the actually writing action, no snapshot generated in this stage
-        DataStream<Committable> written = doWrite(input, initialCommitUser, 
parallelism);
+    public DataStream<Committable> doWrite(
+            DataStream<T> input, String initialCommitUser, @Nullable Integer 
parallelism) {
+        DataStream<Committable> written = super.doWrite(input, 
initialCommitUser, this.parallelism);
 
+        boolean enableCompaction = !table.coreOptions().writeOnly();
         boolean isStreamingMode =
                 input.getExecutionEnvironment()
                                 .getConfiguration()
@@ -75,7 +77,6 @@ public class UnawareBucketWriteSink extends FileStoreSink {
             written = 
written.union(builder.fetchUncommitted(initialCommitUser));
         }
 
-        // commit the committable to generate a new snapshot
-        return doCommit(written, initialCommitUser);
+        return written;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index cb62fcbfa..b7c739e96 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.sink.FileStoreSink;
+import org.apache.paimon.flink.sink.FixedBucketSink;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.source.ContinuousFileStoreSource;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
@@ -85,7 +85,7 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /**
  * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} 
and {@link
- * FileStoreSink}.
+ * FixedBucketSink}.
  */
 @ExtendWith(ParameterizedTestExtension.class)
 public class FileStoreITCase extends AbstractTestBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 16c0e2814..b214a4d32 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -48,8 +48,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -231,8 +231,8 @@ public class FlinkSinkTest {
         DataStreamSource<InternalRow> source =
                 streamExecutionEnvironment.fromCollection(
                         Collections.singletonList(GenericRow.of(1, 1)));
-        FlinkSink<InternalRow> flinkSink = new FileStoreSink(fileStoreTable, 
null, null);
-        SingleOutputStreamOperator<Committable> written = 
flinkSink.doWrite(source, "123", 1);
+        FlinkSink<InternalRow> flinkSink = new FixedBucketSink(fileStoreTable, 
null, null);
+        DataStream<Committable> written = flinkSink.doWrite(source, "123", 1);
         RowDataStoreWriteOperator operator =
                 ((RowDataStoreWriteOperator)
                         ((SimpleOperatorFactory)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index 4748fc1d9..8fd1383ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -53,7 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** IT cases for {@link FileStoreSink} when writing file store and with 
savepoints. */
+/** IT cases for {@link FlinkSink} when writing file store and with 
savepoints. */
 public class SinkSavepointITCase extends AbstractTestBase {
 
     private String path;

Reply via email to