This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push: new 4152543a6 [flink] Refactor FlinkSink to clarify the relationships between different bucket-mode tables and sinks (#2636) 4152543a6 is described below commit 4152543a647fb97b77d054f6cb8fb2fd03da0245 Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Thu Jan 4 17:17:25 2024 +0800 [flink] Refactor FlinkSink to clarify the relationships between different bucket-mode tables and sinks (#2636) --- .../{FlinkCdcSink.java => CdcFixedBucketSink.java} | 7 +++-- .../paimon/flink/sink/cdc/CdcSinkBuilder.java | 2 +- .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 7 +++-- .../{FileStoreSink.java => FixedBucketSink.java} | 6 ++-- .../org/apache/paimon/flink/sink/FlinkSink.java | 7 ++--- .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 6 ++-- ...ileStoreSink.java => RowUnawareBucketSink.java} | 25 +++++++--------- ...BucketWriteSink.java => UnawareBucketSink.java} | 35 +++++++++++----------- .../org/apache/paimon/flink/FileStoreITCase.java | 4 +-- .../apache/paimon/flink/sink/FlinkSinkTest.java | 6 ++-- .../paimon/flink/sink/SinkSavepointITCase.java | 2 +- 11 files changed, 52 insertions(+), 55 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java similarity index 86% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java index 870bf5361..59bdb192b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketSink.java @@ -27,13 +27,14 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; /** - * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema change if necessary. + * A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} and waits for a schema + * change if necessary. */ -public class FlinkCdcSink extends FlinkWriteSink<CdcRecord> { +public class CdcFixedBucketSink extends FlinkWriteSink<CdcRecord> { private static final long serialVersionUID = 1L; - public FlinkCdcSink(FileStoreTable table) { + public CdcFixedBucketSink(FileStoreTable table) { super(table, null); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index 1bda75695..6c17ee6bd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -128,6 +128,6 @@ public class CdcSinkBuilder<T> { FileStoreTable dataTable = (FileStoreTable) table; DataStream<CdcRecord> partitioned = partition(parsed, new CdcRecordChannelComputer(dataTable.schema()), parallelism); - return new FlinkCdcSink(dataTable).sinkFrom(partitioned); + return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index d4c630777..b09d09ad9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.MultiTablesSinkMode; +import org.apache.paimon.flink.sink.FlinkWriteSink; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -43,8 +44,8 @@ import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; /** - * Builder for {@link FlinkCdcSink} when syncing the whole database into one Paimon database. Each - * database table will be written into a separate Paimon table. + * Builder for CDC {@link FlinkWriteSink} when syncing the whole database into one Paimon database. + * Each database table will be written into a separate Paimon table. * * <p>This builder will create a separate sink for each Paimon sink table. Thus this implementation * is not very efficient in resource saving. @@ -167,7 +168,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> { private void buildForFixedBucket(FileStoreTable table, DataStream<CdcRecord> parsed) { DataStream<CdcRecord> partitioned = partition(parsed, new CdcRecordChannelComputer(table.schema()), parallelism); - new FlinkCdcSink(table).sinkFrom(partitioned); + new CdcFixedBucketSink(table).sinkFrom(partitioned); } private void buildDividedCdcSink() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java similarity index 90% copy from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java index 5481ae1f5..613bf369b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java @@ -27,14 +27,14 @@ import javax.annotation.Nullable; import java.util.Map; -/** {@link FlinkSink} for writing records into paimon. */ -public class FileStoreSink extends FlinkWriteSink<InternalRow> { +/** {@link FlinkSink} for writing records into fixed bucket Paimon table. */ +public class FixedBucketSink extends FlinkWriteSink<InternalRow> { private static final long serialVersionUID = 1L; @Nullable private final LogSinkFunction logSinkFunction; - public FileStoreSink( + public FixedBucketSink( FileStoreTable table, @Nullable Map<String, String> overwritePartition, @Nullable LogSinkFunction logSinkFunction) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f72f66885..766edc499 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -149,8 +149,7 @@ public abstract class FlinkSink<T> implements Serializable { assertNoSinkMaterializer(input); // do the actually writing action, no snapshot generated in this stage - SingleOutputStreamOperator<Committable> written = - doWrite(input, initialCommitUser, input.getParallelism()); + DataStream<Committable> written = doWrite(input, initialCommitUser, input.getParallelism()); // commit the committable to generate a new snapshot return doCommit(written, initialCommitUser); @@ -180,8 +179,8 @@ public abstract class FlinkSink<T> implements Serializable { } } - public SingleOutputStreamOperator<Committable> doWrite( - DataStream<T> input, String commitUser, Integer parallelism) { + public DataStream<Committable> doWrite( + DataStream<T> input, String commitUser, @Nullable Integer parallelism) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); boolean isStreaming = StreamExecutionEnvironmentUtils.getConfiguration(env) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 46a0c09fa..17926093c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -35,7 +35,7 @@ import java.util.Map; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; import static org.apache.paimon.utils.Preconditions.checkArgument; -/** Builder for {@link FileStoreSink}. */ +/** Builder for {@link FlinkSink}. */ public class FlinkSinkBuilder { private final FileStoreTable table; @@ -136,7 +136,7 @@ public class FlinkSinkBuilder { input, new RowDataChannelComputer(table.schema(), logSinkFunction != null), parallelism); - FileStoreSink sink = new FileStoreSink(table, overwritePartition, logSinkFunction); + FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, logSinkFunction); return sink.sinkFrom(partitioned); } @@ -144,7 +144,7 @@ public class FlinkSinkBuilder { checkArgument( table instanceof AppendOnlyFileStoreTable, "Unaware bucket mode only works with append-only table for now."); - return new UnawareBucketWriteSink( + return new RowUnawareBucketSink( (AppendOnlyFileStoreTable) table, overwritePartition, logSinkFunction, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java similarity index 68% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java index 5481ae1f5..d838a9aa4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java @@ -19,27 +19,22 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.AppendOnlyFileStoreTable; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import javax.annotation.Nullable; - import java.util.Map; -/** {@link FlinkSink} for writing records into paimon. */ -public class FileStoreSink extends FlinkWriteSink<InternalRow> { - - private static final long serialVersionUID = 1L; - - @Nullable private final LogSinkFunction logSinkFunction; +/** An {@link UnawareBucketSink} which handles {@link InternalRow}. */ +public class RowUnawareBucketSink extends UnawareBucketSink<InternalRow> { - public FileStoreSink( - FileStoreTable table, - @Nullable Map<String, String> overwritePartition, - @Nullable LogSinkFunction logSinkFunction) { - super(table, overwritePartition); - this.logSinkFunction = logSinkFunction; + public RowUnawareBucketSink( + AppendOnlyFileStoreTable table, + Map<String, String> overwritePartitions, + LogSinkFunction logSinkFunction, + Integer parallelism, + boolean boundedInput) { + super(table, overwritePartitions, logSinkFunction, parallelism, boundedInput); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java similarity index 71% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index 802a98dcf..cb2481063 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -18,48 +18,50 @@ package org.apache.paimon.flink.sink; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder; import org.apache.paimon.table.AppendOnlyFileStoreTable; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; + +import javax.annotation.Nullable; import java.util.Map; /** - * Build topology for unaware bucket table sink. + * Sink for unaware-bucket table. * * <p>Note: in unaware-bucket mode, we don't shuffle by bucket in inserting. We can assign * compaction to the inserting jobs aside. */ -public class UnawareBucketWriteSink extends FileStoreSink { +public abstract class UnawareBucketSink<T> extends FlinkWriteSink<T> { + + protected final AppendOnlyFileStoreTable table; + protected final LogSinkFunction logSinkFunction; - private final boolean enableCompaction; - private final AppendOnlyFileStoreTable table; - private final Integer parallelism; + @Nullable private final Integer parallelism; private final boolean boundedInput; - public UnawareBucketWriteSink( + public UnawareBucketSink( AppendOnlyFileStoreTable table, - Map<String, String> overwritePartitions, + @Nullable Map<String, String> overwritePartitions, LogSinkFunction logSinkFunction, - Integer parallelism, + @Nullable Integer parallelism, boolean boundedInput) { - super(table, overwritePartitions, logSinkFunction); + super(table, overwritePartitions); this.table = table; - this.enableCompaction = !table.coreOptions().writeOnly(); + this.logSinkFunction = logSinkFunction; this.parallelism = parallelism; this.boundedInput = boundedInput; } @Override - public DataStreamSink<?> sinkFrom(DataStream<InternalRow> input, String initialCommitUser) { - // do the actually writing action, no snapshot generated in this stage - DataStream<Committable> written = doWrite(input, initialCommitUser, parallelism); + public DataStream<Committable> doWrite( + DataStream<T> input, String initialCommitUser, @Nullable Integer parallelism) { + DataStream<Committable> written = super.doWrite(input, initialCommitUser, this.parallelism); + boolean enableCompaction = !table.coreOptions().writeOnly(); boolean isStreamingMode = input.getExecutionEnvironment() .getConfiguration() @@ -75,7 +77,6 @@ public class UnawareBucketWriteSink extends FileStoreSink { written = written.union(builder.fetchUncommitted(initialCommitUser)); } - // commit the committable to generate a new snapshot - return doCommit(written, initialCommitUser); + return written; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index cb62fcbfa..b7c739e96 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; -import org.apache.paimon.flink.sink.FileStoreSink; +import org.apache.paimon.flink.sink.FixedBucketSink; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.flink.source.ContinuousFileStoreSource; import org.apache.paimon.flink.source.FlinkSourceBuilder; @@ -85,7 +85,7 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link - * FileStoreSink}. + * FixedBucketSink}. */ @ExtendWith(ParameterizedTestExtension.class) public class FileStoreITCase extends AbstractTestBase { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java index 16c0e2814..b214a4d32 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java @@ -48,8 +48,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; @@ -231,8 +231,8 @@ public class FlinkSinkTest { DataStreamSource<InternalRow> source = streamExecutionEnvironment.fromCollection( Collections.singletonList(GenericRow.of(1, 1))); - FlinkSink<InternalRow> flinkSink = new FileStoreSink(fileStoreTable, null, null); - SingleOutputStreamOperator<Committable> written = flinkSink.doWrite(source, "123", 1); + FlinkSink<InternalRow> flinkSink = new FixedBucketSink(fileStoreTable, null, null); + DataStream<Committable> written = flinkSink.doWrite(source, "123", 1); RowDataStoreWriteOperator operator = ((RowDataStoreWriteOperator) ((SimpleOperatorFactory) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java index 4748fc1d9..8fd1383ff 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java @@ -53,7 +53,7 @@ import java.util.concurrent.ThreadLocalRandom; import static org.assertj.core.api.Assertions.assertThat; -/** IT cases for {@link FileStoreSink} when writing file store and with savepoints. */ +/** IT cases for {@link FlinkSink} when writing file store and with savepoints. */ public class SinkSavepointITCase extends AbstractTestBase { private String path;