This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb903287 [api] Publish DataStream Sink & Source API (#3192)
ffb903287 is described below

commit ffb903287d38035550255ab69a5ad820ee4785ff
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 15 16:51:25 2024 +0800

    [api] Publish DataStream Sink & Source API (#3192)
---
 docs/content/program-api/flink-api.md              | 114 ++++++++--------
 .../paimon/flink/sink/cdc/RichCdcSinkBuilder.java  |  70 ++++++++--
 .../paimon/flink/action/SortCompactAction.java     |  28 ++--
 .../paimon/flink/action/TableActionBase.java       |   2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  17 ++-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  88 +++++++++----
 .../paimon/flink/sink/FlinkTableSinkBase.java      |  28 ++--
 .../paimon/flink/sink/LogFlinkSinkBuilder.java     |  36 ++++++
 .../paimon/flink/sink/SortCompactSinkBuilder.java  |  34 +++++
 .../paimon/flink/source/DataTableSource.java       |  21 +--
 .../paimon/flink/source/FlinkSourceBuilder.java    | 144 +++++++++++++--------
 .../org/apache/paimon/flink/FileStoreITCase.java   | 113 +++++++++++-----
 12 files changed, 469 insertions(+), 226 deletions(-)

diff --git a/docs/content/program-api/flink-api.md 
b/docs/content/program-api/flink-api.md
index 942208e1a..746258629 100644
--- a/docs/content/program-api/flink-api.md
+++ b/docs/content/program-api/flink-api.md
@@ -61,58 +61,65 @@ Please choose your Flink version.
 
 Paimon relies on Hadoop environment, you should add hadoop classpath or 
bundled jar.
 
-Paimon does not provide a DataStream API, but you can read or write to Paimon 
tables by the conversion between DataStream and Table in Flink.
+Not only DataStream API, you can also read or write to Paimon tables by the 
conversion between DataStream and Table in Flink.
 See [DataStream API 
Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/).
 
 ## Write to Table 
 
 ```java
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
 public class WriteToTable {
 
-    public static void writeTo() {
+    public static void writeTo() throws Exception {
         // create environments of both APIs
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         // for CONTINUOUS_UNBOUNDED source, set checkpoint interval
         // env.enableCheckpointing(60_000);
-        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
         // create a changelog DataStream
-        DataStream<Row> dataStream =
+        DataStream<Row> input =
                 env.fromElements(
-                        Row.ofKind(RowKind.INSERT, "Alice", 12),
-                        Row.ofKind(RowKind.INSERT, "Bob", 5),
-                        Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
-                        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
+                                Row.ofKind(RowKind.INSERT, "Alice", 12),
+                                Row.ofKind(RowKind.INSERT, "Bob", 5),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
+                                Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
                         .returns(
                                 Types.ROW_NAMED(
-                                        new String[] {"name", "age"},
-                                        Types.STRING, Types.INT));
+                                        new String[] {"name", "age"}, 
Types.STRING, Types.INT));
 
-        // interpret the DataStream as a Table
-        Schema schema = Schema.newBuilder()
-                .column("name", DataTypes.STRING())
-                .column("age", DataTypes.INT())
-                .build();
-        Table table = tableEnv.fromChangelogStream(dataStream, schema);
+        // get table from catalog
+        Options catalogOptions = new Options();
+        catalogOptions.set("warehouse", "/path/to/warehouse");
+        Catalog catalog = 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        Table table = catalog.getTable(Identifier.create("my_db", "T"));
 
-        // create paimon catalog
-        tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 
'warehouse'='...')");
-        tableEnv.executeSql("USE CATALOG paimon");
+        DataType inputType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("age", DataTypes.INT()));
+        FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, 
inputType);
 
-        // register the table under a name and perform an aggregation
-        tableEnv.createTemporaryView("InputTable", table);
+        // set sink parallelism
+        // builder.parallelism(_your_parallelism)
 
-        // insert into paimon table from your data stream table
-        tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM 
InputTable");
+        // set overwrite mode
+        // builder.overwrite(...)
+
+        builder.build();
+        env.execute();
     }
 }
 ```
@@ -120,10 +127,14 @@ public class WriteToTable {
 ## Read from Table
 
 ```java
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
 public class ReadFromTable {
@@ -131,15 +142,22 @@ public class ReadFromTable {
     public static void readFrom() throws Exception {
         // create environments of both APIs
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
-        // create paimon catalog
-        tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 
'warehouse'='...')");
-        tableEnv.executeSql("USE CATALOG paimon");
+        // get table from catalog
+        Options catalogOptions = new Options();
+        catalogOptions.set("warehouse", "/path/to/warehouse");
+        Catalog catalog = 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        Table table = catalog.getTable(Identifier.create("my_db", "T"));
+
+        FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env);
+        
+        // builder.sourceBounded(true);
+        // builder.projection(...);
+        // builder.predicate(...);
+        // builder.limit(...);
+        // builder.sourceParallelism(...);
 
-        // convert to DataStream
-        Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table");
-        DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
+        DataStream<Row> dataStream = builder.buildForRow();
 
         // use this datastream
         dataStream.executeAndCollect().forEachRemaining(System.out::println);
@@ -204,29 +222,15 @@ public class WriteCdcToTable {
         catalogOptions.set("warehouse", "/path/to/warehouse");
         Catalog.Loader catalogLoader = 
                 () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
-        
-        new RichCdcSinkBuilder()
-                .withInput(dataStream)
-                .withTable(createTableIfNotExists(catalogLoader.load(), 
identifier))
-                .withIdentifier(identifier)
-                .withCatalogLoader(catalogLoader)
+        Table table = catalogLoader.load().getTable(identifier);
+
+        new RichCdcSinkBuilder(table)
+                .forRichCdcRecord(dataStream)
+                .identifier(identifier)
+                .catalogLoader(catalogLoader)
                 .build();
 
         env.execute();
     }
-
-    private static Table createTableIfNotExists(Catalog catalog, Identifier 
identifier) throws Exception {
-        Schema.Builder schemaBuilder = Schema.newBuilder();
-        schemaBuilder.primaryKey("order_id");
-        schemaBuilder.column("order_id", DataTypes.BIGINT());
-        schemaBuilder.column("price", DataTypes.DOUBLE());
-        Schema schema = schemaBuilder.build();
-        try {
-            catalog.createTable(identifier, schema, false);
-        } catch (Catalog.TableAlreadyExistException e) {
-            // do something
-        }
-        return catalog.getTable(identifier);
-    }
 }
 ```
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 774895757..610856d3a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.annotation.Public;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
@@ -28,8 +28,12 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 
 import javax.annotation.Nullable;
 
-/** Builder for sink when syncing {@link RichCdcRecord} records into one 
Paimon table. */
-@Experimental
+/**
+ * DataStream API for building Flink Sink for {@link RichCdcRecord} to write 
with schema evolution.
+ *
+ * @since 0.8
+ */
+@Public
 public class RichCdcSinkBuilder {
 
     private DataStream<RichCdcRecord> input = null;
@@ -39,27 +43,26 @@ public class RichCdcSinkBuilder {
 
     @Nullable private Integer parallelism;
 
-    public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
-        this.input = input;
-        return this;
+    public RichCdcSinkBuilder(Table table) {
+        this.table = table;
     }
 
-    public RichCdcSinkBuilder withTable(Table table) {
-        this.table = table;
+    public RichCdcSinkBuilder forRichCdcRecord(DataStream<RichCdcRecord> 
input) {
+        this.input = input;
         return this;
     }
 
-    public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
-        this.parallelism = parallelism;
+    public RichCdcSinkBuilder identifier(Identifier identifier) {
+        this.identifier = identifier;
         return this;
     }
 
-    public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
-        this.identifier = identifier;
+    public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) {
+        this.parallelism = parallelism;
         return this;
     }
 
-    public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+    public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
         this.catalogLoader = catalogLoader;
         return this;
     }
@@ -74,4 +77,45 @@ public class RichCdcSinkBuilder {
                 .withCatalogLoader(catalogLoader)
                 .build();
     }
+
+    // ====================== Deprecated ============================
+
+    /** @deprecated Use constructor to pass table. */
+    @Deprecated
+    public RichCdcSinkBuilder() {}
+
+    /** @deprecated Use {@link #forRichCdcRecord}. */
+    @Deprecated
+    public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
+        this.input = input;
+        return this;
+    }
+
+    /** @deprecated Use constructor to pass Table. */
+    @Deprecated
+    public RichCdcSinkBuilder withTable(Table table) {
+        this.table = table;
+        return this;
+    }
+
+    /** @deprecated Use {@link #parallelism}. */
+    @Deprecated
+    public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
+        this.parallelism = parallelism;
+        return this;
+    }
+
+    /** @deprecated Use {@link #identifier}. */
+    @Deprecated
+    public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
+        this.identifier = identifier;
+        return this;
+    }
+
+    /** @deprecated Use {@link #catalogLoader}. */
+    @Deprecated
+    public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+        this.catalogLoader = catalogLoader;
+        return this;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index f49be619a..0936943ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
 import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
 import org.apache.paimon.predicate.Predicate;
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -85,12 +84,13 @@ public class SortCompactAction extends CompactAction {
         }
         Map<String, String> tableConfig = fileStoreTable.options();
         FlinkSourceBuilder sourceBuilder =
-                new FlinkSourceBuilder(
-                        ObjectIdentifier.of(
-                                catalogName,
-                                identifier.getDatabaseName(),
-                                identifier.getObjectName()),
-                        fileStoreTable);
+                new FlinkSourceBuilder(fileStoreTable)
+                        .sourceName(
+                                ObjectIdentifier.of(
+                                                catalogName,
+                                                identifier.getDatabaseName(),
+                                                identifier.getObjectName())
+                                        .asSummaryString());
 
         if (getPartitions() != null) {
             Predicate partitionPredicate =
@@ -98,22 +98,22 @@ public class SortCompactAction extends CompactAction {
                             getPartitions().stream()
                                     .map(p -> PredicateBuilder.partition(p, 
table.rowType()))
                                     .toArray(Predicate[]::new));
-            sourceBuilder.withPredicate(partitionPredicate);
+            sourceBuilder.predicate(partitionPredicate);
         }
 
         String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
         if (scanParallelism != null) {
-            sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
+            sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism));
         }
 
-        DataStream<RowData> source = 
sourceBuilder.withEnv(env).withContinuousMode(false).build();
+        DataStream<RowData> source = 
sourceBuilder.env(env).sourceBounded(true).build();
         TableSorter sorter =
                 TableSorter.getSorter(env, source, fileStoreTable, 
sortStrategy, orderColumns);
 
-        new FlinkSinkBuilder(fileStoreTable)
-                .withInput(sorter.sort())
+        new SortCompactSinkBuilder(fileStoreTable)
                 .forCompact(true)
-                .withOverwritePartition(new HashMap<>())
+                .forRowData(sorter.sort())
+                .overwrite()
                 .build();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index f10f3d625..6a9793f44 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -61,7 +61,7 @@ public abstract class TableActionBase extends ActionBase {
         List<Transformation<?>> transformations =
                 Collections.singletonList(
                         new FlinkSinkBuilder((FileStoreTable) table)
-                                .withInput(dataStream)
+                                .forRowData(dataStream)
                                 .build()
                                 .getTransformation());
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 97c426ee5..f7ebf5afd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -187,9 +187,7 @@ public abstract class FlinkSink<T> implements Serializable {
     public DataStream<Committable> doWrite(
             DataStream<T> input, String commitUser, @Nullable Integer 
parallelism) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        boolean isStreaming =
-                env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
-                        == RuntimeExecutionMode.STREAMING;
+        boolean isStreaming = isStreaming(input);
 
         boolean writeOnly = table.coreOptions().writeOnly();
         SingleOutputStreamOperator<Committable> written =
@@ -221,10 +219,8 @@ public abstract class FlinkSink<T> implements Serializable 
{
         StreamExecutionEnvironment env = written.getExecutionEnvironment();
         ReadableConfig conf = env.getConfiguration();
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
-        boolean isStreaming =
-                conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         boolean streamingCheckpointEnabled =
-                isStreaming && checkpointConfig.isCheckpointingEnabled();
+                isStreaming(written) && 
checkpointConfig.isCheckpointingEnabled();
         if (streamingCheckpointEnabled) {
             assertStreamingConfiguration(env);
         }
@@ -313,4 +309,13 @@ public abstract class FlinkSink<T> implements Serializable 
{
             boolean streamingCheckpointEnabled);
 
     protected abstract CommittableStateManager<ManifestCommittable> 
createCommittableStateManager();
+
+    public static boolean isStreaming(DataStream<?> input) {
+        return isStreaming(input.getExecutionEnvironment());
+    }
+
+    public static boolean isStreaming(StreamExecutionEnvironment env) {
+        return env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                == RuntimeExecutionMode.STREAMING;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index b5de897b9..3117f5e87 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,77 +18,108 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Builder for {@link FlinkSink}. */
+/**
+ * DataStream API for building Flink Sink.
+ *
+ * @since 0.8
+ */
+@Public
 public class FlinkSinkBuilder {
 
     private final FileStoreTable table;
 
     private DataStream<RowData> input;
     @Nullable private Map<String, String> overwritePartition;
-    @Nullable private LogSinkFunction logSinkFunction;
     @Nullable private Integer parallelism;
-    private boolean boundedInput = false;
-    private boolean compactSink = false;
+    private Boolean boundedInput = null;
 
-    public FlinkSinkBuilder(FileStoreTable table) {
-        this.table = table;
-    }
+    // ============== for extension ==============
 
-    public FlinkSinkBuilder withInput(DataStream<RowData> input) {
-        this.input = input;
-        return this;
+    protected boolean compactSink = false;
+    @Nullable protected LogSinkFunction logSinkFunction;
+
+    public FlinkSinkBuilder(Table table) {
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException("Unsupported table type: " 
+ table);
+        }
+        this.table = (FileStoreTable) table;
     }
 
     /**
-     * Whether we need to overwrite partitions.
-     *
-     * @param overwritePartition If we pass null, it means not overwrite. If 
we pass an empty map,
-     *     it means to overwrite every partition it received. If we pass a 
non-empty map, it means
-     *     we only overwrite the partitions match the map.
-     * @return returns this.
+     * From {@link DataStream} with {@link Row}, need to provide a {@link 
DataType} for builder to
+     * convert those {@link Row}s to a {@link RowData} DataStream.
      */
-    public FlinkSinkBuilder withOverwritePartition(
-            @Nullable Map<String, String> overwritePartition) {
-        this.overwritePartition = overwritePartition;
+    public FlinkSinkBuilder forRow(DataStream<Row> input, DataType 
rowDataType) {
+        RowType rowType = (RowType) rowDataType.getLogicalType();
+        DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new 
DataType[0]);
+
+        DataFormatConverters.RowConverter converter =
+                new DataFormatConverters.RowConverter(fieldDataTypes);
+        this.input =
+                input.map((MapFunction<Row, RowData>) converter::toInternal)
+                        .setParallelism(input.getParallelism())
+                        .returns(InternalTypeInfo.of(rowType));
         return this;
     }
 
-    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction 
logSinkFunction) {
-        this.logSinkFunction = logSinkFunction;
+    /** From {@link DataStream} with {@link RowData}. */
+    public FlinkSinkBuilder forRowData(DataStream<RowData> input) {
+        this.input = input;
         return this;
     }
 
-    public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
-        this.parallelism = parallelism;
+    /** INSERT OVERWRITE. */
+    public FlinkSinkBuilder overwrite() {
+        return overwrite(new HashMap<>());
+    }
+
+    /** INSERT OVERWRITE PARTITION (...). */
+    public FlinkSinkBuilder overwrite(Map<String, String> overwritePartition) {
+        this.overwritePartition = overwritePartition;
         return this;
     }
 
-    public FlinkSinkBuilder withBoundedInputStream(boolean bounded) {
-        this.boundedInput = bounded;
+    /** Set sink parallelism. */
+    public FlinkSinkBuilder parallelism(int parallelism) {
+        this.parallelism = parallelism;
         return this;
     }
 
-    public FlinkSinkBuilder forCompact(boolean compactSink) {
-        this.compactSink = compactSink;
+    /**
+     * Set input bounded, if it is bounded, append table sink does not 
generate a topology for
+     * merging small files.
+     */
+    public FlinkSinkBuilder inputBounded(boolean bounded) {
+        this.boundedInput = bounded;
         return this;
     }
 
+    /** Build {@link DataStreamSink}. */
     public DataStreamSink<?> build() {
         DataStream<InternalRow> input = MapToInternalRow.map(this.input, 
table.rowType());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
@@ -143,6 +174,9 @@ public class FlinkSinkBuilder {
         checkArgument(
                 table.primaryKeys().isEmpty(),
                 "Unaware bucket mode only works with append-only table for 
now.");
+        if (boundedInput == null) {
+            boundedInput = !FlinkSink.isStreaming(input);
+        }
         return new RowUnawareBucketSink(
                         table, overwritePartition, logSinkFunction, 
parallelism, boundedInput)
                 .sinkFrom(input);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 30039188d..b6a944703 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -22,12 +22,10 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
 import org.apache.paimon.flink.log.LogSinkProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -47,6 +45,7 @@ import java.util.Map;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
 
 /** Table sink to create sink. */
 public abstract class FlinkTableSinkBase
@@ -125,17 +124,20 @@ public abstract class FlinkTableSinkBase
         final LogSinkFunction logSinkFunction =
                 overwrite ? null : (logSinkProvider == null ? null : 
logSinkProvider.createSink());
         return new PaimonDataStreamSinkProvider(
-                (dataStream) ->
-                        new FlinkSinkBuilder((FileStoreTable) table)
-                                .withInput(
-                                        new DataStream<>(
-                                                
dataStream.getExecutionEnvironment(),
-                                                
dataStream.getTransformation()))
-                                .withLogSinkFunction(logSinkFunction)
-                                .withOverwritePartition(overwrite ? 
staticPartitions : null)
-                                
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
-                                .withBoundedInputStream(context.isBounded())
-                                .build());
+                (dataStream) -> {
+                    LogFlinkSinkBuilder builder = new 
LogFlinkSinkBuilder(table);
+                    builder.logSinkFunction(logSinkFunction)
+                            .forRowData(
+                                    new DataStream<>(
+                                            
dataStream.getExecutionEnvironment(),
+                                            dataStream.getTransformation()))
+                            .inputBounded(context.isBounded());
+                    if (overwrite) {
+                        builder.overwrite(staticPartitions);
+                    }
+                    
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
+                    return builder.build();
+                });
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java
new file mode 100644
index 000000000..aa64b3e35
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.Table;
+
+import javax.annotation.Nullable;
+
+/** A special version {@link FlinkSinkBuilder} with log sink. */
+public class LogFlinkSinkBuilder extends FlinkSinkBuilder {
+
+    public LogFlinkSinkBuilder(Table table) {
+        super(table);
+    }
+
+    FlinkSinkBuilder logSinkFunction(@Nullable LogSinkFunction 
logSinkFunction) {
+        this.logSinkFunction = logSinkFunction;
+        return this;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java
new file mode 100644
index 000000000..c30ebc824
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.Table;
+
+/** A special version {@link FlinkSinkBuilder} for sort compact. */
+public class SortCompactSinkBuilder extends FlinkSinkBuilder {
+
+    public SortCompactSinkBuilder(Table table) {
+        super(table);
+    }
+
+    public FlinkSinkBuilder forCompact(boolean compactSink) {
+        this.compactSink = compactSink;
+        return this;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index ff0312ee5..c4544426f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -190,21 +190,22 @@ public class DataTableSource extends FlinkTableSource {
         }
 
         FlinkSourceBuilder sourceBuilder =
-                new FlinkSourceBuilder(tableIdentifier, table)
-                        .withContinuousMode(streaming)
-                        .withLogSourceProvider(logSourceProvider)
-                        .withProjection(projectFields)
-                        .withPredicate(predicate)
-                        .withLimit(limit)
-                        .withWatermarkStrategy(watermarkStrategy)
-                        
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
+                new FlinkSourceBuilder(table)
+                        .sourceName(tableIdentifier.asSummaryString())
+                        .sourceBounded(!streaming)
+                        .logSourceProvider(logSourceProvider)
+                        .projection(projectFields)
+                        .predicate(predicate)
+                        .limit(limit)
+                        .watermarkStrategy(watermarkStrategy)
+                        
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
 
         return new PaimonDataStreamScanProvider(
                 !streaming,
                 env ->
                         sourceBuilder
-                                .withParallelism(inferSourceParallelism(env))
-                                .withEnv(env)
+                                .sourceParallelism(inferSourceParallelism(env))
+                                .env(env)
                                 .build());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index d8878f360..6b542f71c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -24,6 +24,7 @@ import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.Projection;
 import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.sink.FlinkSink;
 import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
 import org.apache.paimon.flink.source.operator.MonitorFunction;
 import org.apache.paimon.flink.utils.TableScanUtils;
@@ -35,6 +36,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
@@ -45,33 +47,38 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
- * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
- * ContinuousFileStoreSource}. This is for normal read/write jobs.
+ * DataStream API for building Flink Source.
+ *
+ * @since 0.8
  */
 public class FlinkSourceBuilder {
 
-    private final ObjectIdentifier tableIdentifier;
     private final Table table;
     private final Options conf;
 
-    private boolean isContinuous = false;
+    private String sourceName;
+    private Boolean sourceBounded;
     private StreamExecutionEnvironment env;
     @Nullable private int[][] projectedFields;
     @Nullable private Predicate predicate;
@@ -81,54 +88,61 @@ public class FlinkSourceBuilder {
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
     @Nullable private DynamicPartitionFilteringInfo 
dynamicPartitionFilteringInfo;
 
-    public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
-        this.tableIdentifier = tableIdentifier;
+    public FlinkSourceBuilder(Table table) {
         this.table = table;
+        this.sourceName = table.name();
         this.conf = Options.fromMap(table.options());
     }
 
-    public FlinkSourceBuilder withContinuousMode(boolean isContinuous) {
-        this.isContinuous = isContinuous;
+    public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
+        this.env = env;
+        if (sourceBounded == null) {
+            sourceBounded = !FlinkSink.isStreaming(env);
+        }
         return this;
     }
 
-    public FlinkSourceBuilder withEnv(StreamExecutionEnvironment env) {
-        this.env = env;
+    public FlinkSourceBuilder sourceName(String name) {
+        this.sourceName = name;
         return this;
     }
 
-    public FlinkSourceBuilder withProjection(int[][] projectedFields) {
-        this.projectedFields = projectedFields;
+    public FlinkSourceBuilder sourceBounded(boolean bounded) {
+        this.sourceBounded = bounded;
         return this;
     }
 
-    public FlinkSourceBuilder withPredicate(Predicate predicate) {
-        this.predicate = predicate;
+    public FlinkSourceBuilder projection(int[] projectedFields) {
+        return projection(Projection.of(projectedFields).toNestedIndexes());
+    }
+
+    public FlinkSourceBuilder projection(int[][] projectedFields) {
+        this.projectedFields = projectedFields;
         return this;
     }
 
-    public FlinkSourceBuilder withLimit(@Nullable Long limit) {
-        this.limit = limit;
+    public FlinkSourceBuilder predicate(Predicate predicate) {
+        this.predicate = predicate;
         return this;
     }
 
-    public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider 
logSourceProvider) {
-        this.logSourceProvider = logSourceProvider;
+    public FlinkSourceBuilder limit(@Nullable Long limit) {
+        this.limit = limit;
         return this;
     }
 
-    public FlinkSourceBuilder withParallelism(@Nullable Integer parallelism) {
+    public FlinkSourceBuilder sourceParallelism(@Nullable Integer parallelism) 
{
         this.parallelism = parallelism;
         return this;
     }
 
-    public FlinkSourceBuilder withWatermarkStrategy(
+    public FlinkSourceBuilder watermarkStrategy(
             @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
         return this;
     }
 
-    public FlinkSourceBuilder withDynamicPartitionFilteringFields(
+    public FlinkSourceBuilder dynamicPartitionFilteringFields(
             List<String> dynamicPartitionFilteringFields) {
         if (dynamicPartitionFilteringFields != null && 
!dynamicPartitionFilteringFields.isEmpty()) {
             checkState(
@@ -144,6 +158,12 @@ public class FlinkSourceBuilder {
         return this;
     }
 
+    @Deprecated
+    FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) {
+        this.logSourceProvider = logSourceProvider;
+        return this;
+    }
+
     private ReadBuilder createReadBuilder() {
         ReadBuilder readBuilder =
                 
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
@@ -194,7 +214,7 @@ public class FlinkSourceBuilder {
                         watermarkStrategy == null
                                 ? WatermarkStrategy.noWatermarks()
                                 : watermarkStrategy,
-                        tableIdentifier.asSummaryString(),
+                        sourceName,
                         produceTypeInfo());
         if (parallelism != null) {
             dataStream.setParallelism(parallelism);
@@ -212,44 +232,58 @@ public class FlinkSourceBuilder {
         return InternalTypeInfo.of(produceType);
     }
 
+    /** Build source {@link DataStream} with {@link RowData}. */
+    public DataStream<Row> buildForRow() {
+        DataType rowType = 
fromLogicalToDataType(toLogicalType(table.rowType()));
+        DataType[] fieldDataTypes = rowType.getChildren().toArray(new 
DataType[0]);
+
+        DataFormatConverters.RowConverter converter =
+                new DataFormatConverters.RowConverter(fieldDataTypes);
+        DataStream<RowData> source = build();
+        return source.map((MapFunction<RowData, Row>) converter::toExternal)
+                .setParallelism(source.getParallelism())
+                .returns(ExternalTypeInfo.of(rowType));
+    }
+
+    /** Build source {@link DataStream} with {@link RowData}. */
     public DataStream<RowData> build() {
         if (env == null) {
             throw new IllegalArgumentException("StreamExecutionEnvironment 
should not be null.");
         }
 
-        if (isContinuous) {
-            TableScanUtils.streamingReadingValidate(table);
-
-            // TODO visit all options through CoreOptions
-            StartupMode startupMode = CoreOptions.startupMode(conf);
-            StreamingReadMode streamingReadMode = 
CoreOptions.streamReadType(conf);
-
-            if (logSourceProvider != null && streamingReadMode != FILE) {
-                if (startupMode != StartupMode.LATEST_FULL) {
-                    return toDataStream(logSourceProvider.createSource(null));
-                } else {
-                    return toDataStream(
-                            HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
-                                            
LogHybridSourceFactory.buildHybridFirstSource(
-                                                    table, projectedFields, 
predicate))
-                                    .addSource(
-                                            new 
LogHybridSourceFactory(logSourceProvider),
-                                            Boundedness.CONTINUOUS_UNBOUNDED)
-                                    .build());
-                }
+        if (sourceBounded) {
+            return buildStaticFileSource();
+        }
+
+        TableScanUtils.streamingReadingValidate(table);
+
+        // TODO visit all options through CoreOptions
+        StartupMode startupMode = CoreOptions.startupMode(conf);
+        StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);
+
+        if (logSourceProvider != null && streamingReadMode != FILE) {
+            if (startupMode != StartupMode.LATEST_FULL) {
+                return toDataStream(logSourceProvider.createSource(null));
             } else {
-                if 
(conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
-                    return buildAlignedContinuousFileSource();
-                } else if (conf.contains(CoreOptions.CONSUMER_ID)
-                        && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
-                                == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
-                    return buildContinuousStreamOperator();
-                } else {
-                    return buildContinuousFileSource();
-                }
+                return toDataStream(
+                        HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
+                                        
LogHybridSourceFactory.buildHybridFirstSource(
+                                                table, projectedFields, 
predicate))
+                                .addSource(
+                                        new 
LogHybridSourceFactory(logSourceProvider),
+                                        Boundedness.CONTINUOUS_UNBOUNDED)
+                                .build());
             }
         } else {
-            return buildStaticFileSource();
+            if 
(conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
+                return buildAlignedContinuousFileSource();
+            } else if (conf.contains(CoreOptions.CONSUMER_ID)
+                    && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
+                            == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
+                return buildContinuousStreamOperator();
+            } else {
+                return buildContinuousFileSource();
+            }
         }
     }
 
@@ -262,7 +296,7 @@ public class FlinkSourceBuilder {
         dataStream =
                 MonitorFunction.buildSource(
                         env,
-                        tableIdentifier.asSummaryString(),
+                        sourceName,
                         produceTypeInfo(),
                         createReadBuilder(),
                         
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
@@ -276,7 +310,7 @@ public class FlinkSourceBuilder {
         return dataStream;
     }
 
-    public void 
assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
+    private void 
assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
         checkArgument(
                 checkpointConfig.isCheckpointingEnabled(),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 6b68532a2..dff589e92 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -35,12 +35,14 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.FailingFileIO;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.GenericRowData;
@@ -50,6 +52,7 @@ import 
org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -135,17 +138,54 @@ public class FileStoreITCase extends AbstractTestBase {
         return new SerializableRowData(row, 
InternalSerializers.create(TABLE_TYPE));
     }
 
+    @TestTemplate
+    public void testRowSourceSink() throws Exception {
+        FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] 
{1, 2});
+
+        // write
+        DataStreamSource<RowData> source = buildTestSource(env, isBatch);
+        DataStream<Row> input =
+                source.map(
+                                (MapFunction<RowData, Row>)
+                                        r ->
+                                                Row.of(
+                                                        r.getInt(0),
+                                                        
r.getString(1).toString(),
+                                                        r.getInt(2)))
+                        .setParallelism(source.getParallelism());
+        DataType inputType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("v", DataTypes.INT()),
+                        DataTypes.FIELD("p", DataTypes.STRING()),
+                        DataTypes.FIELD("_k", DataTypes.INT()));
+        new FlinkSinkBuilder(table).forRow(input, inputType).build();
+        env.execute();
+
+        // read
+        List<Row> results =
+                executeAndCollectRow(
+                        new 
FlinkSourceBuilder(table).env(env).sourceBounded(true).buildForRow());
+
+        // assert
+        Row[] expected =
+                new Row[] {
+                    Row.of(5, "p2", 1), Row.of(3, "p2", 5), Row.of(5, "p1", 
1), Row.of(0, "p1", 2)
+                };
+        assertThat(results).containsExactlyInAnyOrder(expected);
+    }
+
     @TestTemplate
     public void testPartitioned() throws Exception {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] 
{1, 2});
 
         // write
-        new FlinkSinkBuilder(table).withInput(buildTestSource(env, 
isBatch)).build();
+        new FlinkSinkBuilder(table).forRowData(buildTestSource(env, 
isBatch)).build();
         env.execute();
 
         // read
         List<Row> results =
-                executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, 
table).withEnv(env).build());
+                executeAndCollect(
+                        new 
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
 
         // assert
         Row[] expected =
@@ -160,12 +200,13 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});
 
         // write
-        new FlinkSinkBuilder(table).withInput(buildTestSource(env, 
isBatch)).build();
+        new FlinkSinkBuilder(table).forRowData(buildTestSource(env, 
isBatch)).build();
         env.execute();
 
         // read
         List<Row> results =
-                executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, 
table).withEnv(env).build());
+                executeAndCollect(
+                        new 
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
 
         // assert
         Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)};
@@ -179,7 +220,7 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] 
{1, 2});
 
         // write
-        new FlinkSinkBuilder(table).withInput(buildTestSource(env, 
isBatch)).build();
+        new FlinkSinkBuilder(table).forRowData(buildTestSource(env, 
isBatch)).build();
         env.execute();
 
         // overwrite p2
@@ -190,15 +231,13 @@ public class FileStoreITCase extends AbstractTestBase {
                         InternalTypeInfo.of(TABLE_TYPE));
         Map<String, String> overwrite = new HashMap<>();
         overwrite.put("p", "p2");
-        new FlinkSinkBuilder(table)
-                .withInput(partialData)
-                .withOverwritePartition(overwrite)
-                .build();
+        new 
FlinkSinkBuilder(table).forRowData(partialData).overwrite(overwrite).build();
         env.execute();
 
         // read
         List<Row> results =
-                executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, 
table).withEnv(env).build());
+                executeAndCollect(
+                        new 
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
 
         Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), 
Row.of(0, "p1", 2)};
         assertThat(results).containsExactlyInAnyOrder(expected);
@@ -209,14 +248,13 @@ public class FileStoreITCase extends AbstractTestBase {
                         Collections.singletonList(
                                 wrap(GenericRowData.of(19, 
StringData.fromString("p2"), 6))),
                         InternalTypeInfo.of(TABLE_TYPE));
-        new FlinkSinkBuilder(table)
-                .withInput(partialData)
-                .withOverwritePartition(new HashMap<>())
-                .build();
+        new 
FlinkSinkBuilder(table).forRowData(partialData).overwrite().build();
         env.execute();
 
         // read
-        results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, 
table).withEnv(env).build());
+        results =
+                executeAndCollect(
+                        new 
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
         expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1), 
Row.of(0, "p1", 2)};
         assertThat(results).containsExactlyInAnyOrder(expected);
 
@@ -230,13 +268,15 @@ public class FileStoreITCase extends AbstractTestBase {
                         table.copy(
                                 Collections.singletonMap(
                                         
CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false")))
-                .withInput(partialData)
-                .withOverwritePartition(new HashMap<>())
+                .forRowData(partialData)
+                .overwrite()
                 .build();
         env.execute();
 
         // read
-        results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, 
table).withEnv(env).build());
+        results =
+                executeAndCollect(
+                        new 
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
         expected = new Row[] {Row.of(20, "p2", 3)};
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
@@ -246,12 +286,13 @@ public class FileStoreITCase extends AbstractTestBase {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]);
 
         // write
-        new FlinkSinkBuilder(table).withInput(buildTestSource(env, 
isBatch)).build();
+        new FlinkSinkBuilder(table).forRowData(buildTestSource(env, 
isBatch)).build();
         env.execute();
 
         // read
         List<Row> results =
-                executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, 
table).withEnv(env).build());
+                executeAndCollect(
+                        new 
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
 
         // assert
         // in streaming mode, expect origin data X 2 (FiniteTestSource)
@@ -275,7 +316,7 @@ public class FileStoreITCase extends AbstractTestBase {
 
     private void testProjection(FileStoreTable table) throws Exception {
         // write
-        new FlinkSinkBuilder(table).withInput(buildTestSource(env, 
isBatch)).build();
+        new FlinkSinkBuilder(table).forRowData(buildTestSource(env, 
isBatch)).build();
         env.execute();
 
         // read
@@ -288,9 +329,10 @@ public class FileStoreITCase extends AbstractTestBase {
                                         projection.project(TABLE_TYPE)));
         List<Row> results =
                 executeAndCollect(
-                        new FlinkSourceBuilder(IDENTIFIER, table)
-                                .withProjection(projection.toNestedIndexes())
-                                .withEnv(env)
+                        new FlinkSourceBuilder(table)
+                                .sourceBounded(true)
+                                .projection(projection.toNestedIndexes())
+                                .env(env)
                                 .build(),
                         converter);
 
@@ -328,10 +370,7 @@ public class FileStoreITCase extends AbstractTestBase {
                 table.copy(
                         
Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024"));
         DataStream<RowData> source =
-                new FlinkSourceBuilder(IDENTIFIER, table)
-                        .withContinuousMode(true)
-                        .withEnv(env)
-                        .build();
+                new 
FlinkSourceBuilder(table).sourceBounded(false).env(env).build();
         Transformation<RowData> transformation = source.getTransformation();
         assertThat(transformation).isInstanceOf(SourceTransformation.class);
         assertThat(((SourceTransformation<?, ?, ?>) 
transformation).getSource().getBoundedness())
@@ -343,9 +382,9 @@ public class FileStoreITCase extends AbstractTestBase {
 
         BlockingIterator<RowData, Row> iterator =
                 BlockingIterator.of(
-                        new FlinkSourceBuilder(IDENTIFIER, table)
-                                .withContinuousMode(true)
-                                .withEnv(env)
+                        new FlinkSourceBuilder(table)
+                                .sourceBounded(false)
+                                .env(env)
                                 .build()
                                 .executeAndCollect(),
                         CONVERTER::toExternal);
@@ -382,7 +421,7 @@ public class FileStoreITCase extends AbstractTestBase {
         }
         DataStreamSource<RowData> source =
                 env.addSource(new FiniteTestSource<>(src, true), 
InternalTypeInfo.of(TABLE_TYPE));
-        new FlinkSinkBuilder(table).withInput(source).build();
+        new FlinkSinkBuilder(table).forRowData(source).build();
         env.execute();
         
assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
     }
@@ -468,4 +507,14 @@ public class FileStoreITCase extends AbstractTestBase {
         iterator.close();
         return results;
     }
+
+    public static List<Row> executeAndCollectRow(DataStream<Row> source) 
throws Exception {
+        CloseableIterator<Row> iterator = source.executeAndCollect();
+        List<Row> results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            results.add(iterator.next());
+        }
+        iterator.close();
+        return results;
+    }
 }

Reply via email to