This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ffb903287 [api] Publish DataStream Sink & Source API (#3192)
ffb903287 is described below
commit ffb903287d38035550255ab69a5ad820ee4785ff
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 15 16:51:25 2024 +0800
[api] Publish DataStream Sink & Source API (#3192)
---
docs/content/program-api/flink-api.md | 114 ++++++++--------
.../paimon/flink/sink/cdc/RichCdcSinkBuilder.java | 70 ++++++++--
.../paimon/flink/action/SortCompactAction.java | 28 ++--
.../paimon/flink/action/TableActionBase.java | 2 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 17 ++-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 88 +++++++++----
.../paimon/flink/sink/FlinkTableSinkBase.java | 28 ++--
.../paimon/flink/sink/LogFlinkSinkBuilder.java | 36 ++++++
.../paimon/flink/sink/SortCompactSinkBuilder.java | 34 +++++
.../paimon/flink/source/DataTableSource.java | 21 +--
.../paimon/flink/source/FlinkSourceBuilder.java | 144 +++++++++++++--------
.../org/apache/paimon/flink/FileStoreITCase.java | 113 +++++++++++-----
12 files changed, 469 insertions(+), 226 deletions(-)
diff --git a/docs/content/program-api/flink-api.md
b/docs/content/program-api/flink-api.md
index 942208e1a..746258629 100644
--- a/docs/content/program-api/flink-api.md
+++ b/docs/content/program-api/flink-api.md
@@ -61,58 +61,65 @@ Please choose your Flink version.
Paimon relies on Hadoop environment, you should add hadoop classpath or
bundled jar.
-Paimon does not provide a DataStream API, but you can read or write to Paimon
tables by the conversion between DataStream and Table in Flink.
+Not only DataStream API, you can also read or write to Paimon tables by the
conversion between DataStream and Table in Flink.
See [DataStream API
Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/).
## Write to Table
```java
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
public class WriteToTable {
- public static void writeTo() {
+ public static void writeTo() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// for CONTINUOUS_UNBOUNDED source, set checkpoint interval
// env.enableCheckpointing(60_000);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a changelog DataStream
- DataStream<Row> dataStream =
+ DataStream<Row> input =
env.fromElements(
- Row.ofKind(RowKind.INSERT, "Alice", 12),
- Row.ofKind(RowKind.INSERT, "Bob", 5),
- Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
- Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
+ Row.ofKind(RowKind.INSERT, "Alice", 12),
+ Row.ofKind(RowKind.INSERT, "Bob", 5),
+ Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
+ Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
.returns(
Types.ROW_NAMED(
- new String[] {"name", "age"},
- Types.STRING, Types.INT));
+ new String[] {"name", "age"},
Types.STRING, Types.INT));
- // interpret the DataStream as a Table
- Schema schema = Schema.newBuilder()
- .column("name", DataTypes.STRING())
- .column("age", DataTypes.INT())
- .build();
- Table table = tableEnv.fromChangelogStream(dataStream, schema);
+ // get table from catalog
+ Options catalogOptions = new Options();
+ catalogOptions.set("warehouse", "/path/to/warehouse");
+ Catalog catalog =
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ Table table = catalog.getTable(Identifier.create("my_db", "T"));
- // create paimon catalog
- tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon',
'warehouse'='...')");
- tableEnv.executeSql("USE CATALOG paimon");
+ DataType inputType =
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()));
+ FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input,
inputType);
- // register the table under a name and perform an aggregation
- tableEnv.createTemporaryView("InputTable", table);
+ // set sink parallelism
+ // builder.parallelism(_your_parallelism)
- // insert into paimon table from your data stream table
- tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM
InputTable");
+ // set overwrite mode
+ // builder.overwrite(...)
+
+ builder.build();
+ env.execute();
}
}
```
@@ -120,10 +127,14 @@ public class WriteToTable {
## Read from Table
```java
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class ReadFromTable {
@@ -131,15 +142,22 @@ public class ReadFromTable {
public static void readFrom() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- // create paimon catalog
- tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon',
'warehouse'='...')");
- tableEnv.executeSql("USE CATALOG paimon");
+ // get table from catalog
+ Options catalogOptions = new Options();
+ catalogOptions.set("warehouse", "/path/to/warehouse");
+ Catalog catalog =
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ Table table = catalog.getTable(Identifier.create("my_db", "T"));
+
+ FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env);
+
+ // builder.sourceBounded(true);
+ // builder.projection(...);
+ // builder.predicate(...);
+ // builder.limit(...);
+ // builder.sourceParallelism(...);
- // convert to DataStream
- Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table");
- DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
+ DataStream<Row> dataStream = builder.buildForRow();
// use this datastream
dataStream.executeAndCollect().forEachRemaining(System.out::println);
@@ -204,29 +222,15 @@ public class WriteCdcToTable {
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog.Loader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
-
- new RichCdcSinkBuilder()
- .withInput(dataStream)
- .withTable(createTableIfNotExists(catalogLoader.load(),
identifier))
- .withIdentifier(identifier)
- .withCatalogLoader(catalogLoader)
+ Table table = catalogLoader.load().getTable(identifier);
+
+ new RichCdcSinkBuilder(table)
+ .forRichCdcRecord(dataStream)
+ .identifier(identifier)
+ .catalogLoader(catalogLoader)
.build();
env.execute();
}
-
- private static Table createTableIfNotExists(Catalog catalog, Identifier
identifier) throws Exception {
- Schema.Builder schemaBuilder = Schema.newBuilder();
- schemaBuilder.primaryKey("order_id");
- schemaBuilder.column("order_id", DataTypes.BIGINT());
- schemaBuilder.column("price", DataTypes.DOUBLE());
- Schema schema = schemaBuilder.build();
- try {
- catalog.createTable(identifier, schema, false);
- } catch (Catalog.TableAlreadyExistException e) {
- // do something
- }
- return catalog.getTable(identifier);
- }
}
```
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 774895757..610856d3a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
@@ -28,8 +28,12 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import javax.annotation.Nullable;
-/** Builder for sink when syncing {@link RichCdcRecord} records into one
Paimon table. */
-@Experimental
+/**
+ * DataStream API for building Flink Sink for {@link RichCdcRecord} to write
with schema evolution.
+ *
+ * @since 0.8
+ */
+@Public
public class RichCdcSinkBuilder {
private DataStream<RichCdcRecord> input = null;
@@ -39,27 +43,26 @@ public class RichCdcSinkBuilder {
@Nullable private Integer parallelism;
- public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
- this.input = input;
- return this;
+ public RichCdcSinkBuilder(Table table) {
+ this.table = table;
}
- public RichCdcSinkBuilder withTable(Table table) {
- this.table = table;
+ public RichCdcSinkBuilder forRichCdcRecord(DataStream<RichCdcRecord>
input) {
+ this.input = input;
return this;
}
- public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
- this.parallelism = parallelism;
+ public RichCdcSinkBuilder identifier(Identifier identifier) {
+ this.identifier = identifier;
return this;
}
- public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
- this.identifier = identifier;
+ public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) {
+ this.parallelism = parallelism;
return this;
}
- public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
@@ -74,4 +77,45 @@ public class RichCdcSinkBuilder {
.withCatalogLoader(catalogLoader)
.build();
}
+
+ // ====================== Deprecated ============================
+
+ /** @deprecated Use constructor to pass table. */
+ @Deprecated
+ public RichCdcSinkBuilder() {}
+
+ /** @deprecated Use {@link #forRichCdcRecord}. */
+ @Deprecated
+ public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
+ this.input = input;
+ return this;
+ }
+
+ /** @deprecated Use constructor to pass Table. */
+ @Deprecated
+ public RichCdcSinkBuilder withTable(Table table) {
+ this.table = table;
+ return this;
+ }
+
+ /** @deprecated Use {@link #parallelism}. */
+ @Deprecated
+ public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ /** @deprecated Use {@link #identifier}. */
+ @Deprecated
+ public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ /** @deprecated Use {@link #catalogLoader}. */
+ @Deprecated
+ public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+ this.catalogLoader = catalogLoader;
+ return this;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index f49be619a..0936943ed 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -85,12 +84,13 @@ public class SortCompactAction extends CompactAction {
}
Map<String, String> tableConfig = fileStoreTable.options();
FlinkSourceBuilder sourceBuilder =
- new FlinkSourceBuilder(
- ObjectIdentifier.of(
- catalogName,
- identifier.getDatabaseName(),
- identifier.getObjectName()),
- fileStoreTable);
+ new FlinkSourceBuilder(fileStoreTable)
+ .sourceName(
+ ObjectIdentifier.of(
+ catalogName,
+ identifier.getDatabaseName(),
+ identifier.getObjectName())
+ .asSummaryString());
if (getPartitions() != null) {
Predicate partitionPredicate =
@@ -98,22 +98,22 @@ public class SortCompactAction extends CompactAction {
getPartitions().stream()
.map(p -> PredicateBuilder.partition(p,
table.rowType()))
.toArray(Predicate[]::new));
- sourceBuilder.withPredicate(partitionPredicate);
+ sourceBuilder.predicate(partitionPredicate);
}
String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
- sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
+ sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism));
}
- DataStream<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(false).build();
+ DataStream<RowData> source =
sourceBuilder.env(env).sourceBounded(true).build();
TableSorter sorter =
TableSorter.getSorter(env, source, fileStoreTable,
sortStrategy, orderColumns);
- new FlinkSinkBuilder(fileStoreTable)
- .withInput(sorter.sort())
+ new SortCompactSinkBuilder(fileStoreTable)
.forCompact(true)
- .withOverwritePartition(new HashMap<>())
+ .forRowData(sorter.sort())
+ .overwrite()
.build();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index f10f3d625..6a9793f44 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -61,7 +61,7 @@ public abstract class TableActionBase extends ActionBase {
List<Transformation<?>> transformations =
Collections.singletonList(
new FlinkSinkBuilder((FileStoreTable) table)
- .withInput(dataStream)
+ .forRowData(dataStream)
.build()
.getTransformation());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 97c426ee5..f7ebf5afd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -187,9 +187,7 @@ public abstract class FlinkSink<T> implements Serializable {
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer
parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
- boolean isStreaming =
- env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
- == RuntimeExecutionMode.STREAMING;
+ boolean isStreaming = isStreaming(input);
boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
@@ -221,10 +219,8 @@ public abstract class FlinkSink<T> implements Serializable
{
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
- boolean isStreaming =
- conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
- isStreaming && checkpointConfig.isCheckpointingEnabled();
+ isStreaming(written) &&
checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}
@@ -313,4 +309,13 @@ public abstract class FlinkSink<T> implements Serializable
{
boolean streamingCheckpointEnabled);
protected abstract CommittableStateManager<ManifestCommittable>
createCommittableStateManager();
+
+ public static boolean isStreaming(DataStream<?> input) {
+ return isStreaming(input.getExecutionEnvironment());
+ }
+
+ public static boolean isStreaming(StreamExecutionEnvironment env) {
+ return env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index b5de897b9..3117f5e87 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,77 +18,108 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Builder for {@link FlinkSink}. */
+/**
+ * DataStream API for building Flink Sink.
+ *
+ * @since 0.8
+ */
+@Public
public class FlinkSinkBuilder {
private final FileStoreTable table;
private DataStream<RowData> input;
@Nullable private Map<String, String> overwritePartition;
- @Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
- private boolean boundedInput = false;
- private boolean compactSink = false;
+ private Boolean boundedInput = null;
- public FlinkSinkBuilder(FileStoreTable table) {
- this.table = table;
- }
+ // ============== for extension ==============
- public FlinkSinkBuilder withInput(DataStream<RowData> input) {
- this.input = input;
- return this;
+ protected boolean compactSink = false;
+ @Nullable protected LogSinkFunction logSinkFunction;
+
+ public FlinkSinkBuilder(Table table) {
+ if (!(table instanceof FileStoreTable)) {
+ throw new UnsupportedOperationException("Unsupported table type: "
+ table);
+ }
+ this.table = (FileStoreTable) table;
}
/**
- * Whether we need to overwrite partitions.
- *
- * @param overwritePartition If we pass null, it means not overwrite. If
we pass an empty map,
- * it means to overwrite every partition it received. If we pass a
non-empty map, it means
- * we only overwrite the partitions match the map.
- * @return returns this.
+ * From {@link DataStream} with {@link Row}, need to provide a {@link
DataType} for builder to
+ * convert those {@link Row}s to a {@link RowData} DataStream.
*/
- public FlinkSinkBuilder withOverwritePartition(
- @Nullable Map<String, String> overwritePartition) {
- this.overwritePartition = overwritePartition;
+ public FlinkSinkBuilder forRow(DataStream<Row> input, DataType
rowDataType) {
+ RowType rowType = (RowType) rowDataType.getLogicalType();
+ DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new
DataType[0]);
+
+ DataFormatConverters.RowConverter converter =
+ new DataFormatConverters.RowConverter(fieldDataTypes);
+ this.input =
+ input.map((MapFunction<Row, RowData>) converter::toInternal)
+ .setParallelism(input.getParallelism())
+ .returns(InternalTypeInfo.of(rowType));
return this;
}
- public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction
logSinkFunction) {
- this.logSinkFunction = logSinkFunction;
+ /** From {@link DataStream} with {@link RowData}. */
+ public FlinkSinkBuilder forRowData(DataStream<RowData> input) {
+ this.input = input;
return this;
}
- public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
- this.parallelism = parallelism;
+ /** INSERT OVERWRITE. */
+ public FlinkSinkBuilder overwrite() {
+ return overwrite(new HashMap<>());
+ }
+
+ /** INSERT OVERWRITE PARTITION (...). */
+ public FlinkSinkBuilder overwrite(Map<String, String> overwritePartition) {
+ this.overwritePartition = overwritePartition;
return this;
}
- public FlinkSinkBuilder withBoundedInputStream(boolean bounded) {
- this.boundedInput = bounded;
+ /** Set sink parallelism. */
+ public FlinkSinkBuilder parallelism(int parallelism) {
+ this.parallelism = parallelism;
return this;
}
- public FlinkSinkBuilder forCompact(boolean compactSink) {
- this.compactSink = compactSink;
+ /**
+ * Set input bounded, if it is bounded, append table sink does not
generate a topology for
+ * merging small files.
+ */
+ public FlinkSinkBuilder inputBounded(boolean bounded) {
+ this.boundedInput = bounded;
return this;
}
+ /** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
DataStream<InternalRow> input = MapToInternalRow.map(this.input,
table.rowType());
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
@@ -143,6 +174,9 @@ public class FlinkSinkBuilder {
checkArgument(
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for
now.");
+ if (boundedInput == null) {
+ boundedInput = !FlinkSink.isStreaming(input);
+ }
return new RowUnawareBucketSink(
table, overwritePartition, logSinkFunction,
parallelism, boundedInput)
.sinkFrom(input);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 30039188d..b6a944703 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -22,12 +22,10 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -47,6 +45,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
/** Table sink to create sink. */
public abstract class FlinkTableSinkBase
@@ -125,17 +124,20 @@ public abstract class FlinkTableSinkBase
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null :
logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
- (dataStream) ->
- new FlinkSinkBuilder((FileStoreTable) table)
- .withInput(
- new DataStream<>(
-
dataStream.getExecutionEnvironment(),
-
dataStream.getTransformation()))
- .withLogSinkFunction(logSinkFunction)
- .withOverwritePartition(overwrite ?
staticPartitions : null)
-
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
- .withBoundedInputStream(context.isBounded())
- .build());
+ (dataStream) -> {
+ LogFlinkSinkBuilder builder = new
LogFlinkSinkBuilder(table);
+ builder.logSinkFunction(logSinkFunction)
+ .forRowData(
+ new DataStream<>(
+
dataStream.getExecutionEnvironment(),
+ dataStream.getTransformation()))
+ .inputBounded(context.isBounded());
+ if (overwrite) {
+ builder.overwrite(staticPartitions);
+ }
+
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
+ return builder.build();
+ });
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java
new file mode 100644
index 000000000..aa64b3e35
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.Table;
+
+import javax.annotation.Nullable;
+
+/** A special version {@link FlinkSinkBuilder} with log sink. */
+public class LogFlinkSinkBuilder extends FlinkSinkBuilder {
+
+ public LogFlinkSinkBuilder(Table table) {
+ super(table);
+ }
+
+ FlinkSinkBuilder logSinkFunction(@Nullable LogSinkFunction
logSinkFunction) {
+ this.logSinkFunction = logSinkFunction;
+ return this;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java
new file mode 100644
index 000000000..c30ebc824
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.Table;
+
+/** A special version {@link FlinkSinkBuilder} for sort compact. */
+public class SortCompactSinkBuilder extends FlinkSinkBuilder {
+
+ public SortCompactSinkBuilder(Table table) {
+ super(table);
+ }
+
+ public FlinkSinkBuilder forCompact(boolean compactSink) {
+ this.compactSink = compactSink;
+ return this;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index ff0312ee5..c4544426f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -190,21 +190,22 @@ public class DataTableSource extends FlinkTableSource {
}
FlinkSourceBuilder sourceBuilder =
- new FlinkSourceBuilder(tableIdentifier, table)
- .withContinuousMode(streaming)
- .withLogSourceProvider(logSourceProvider)
- .withProjection(projectFields)
- .withPredicate(predicate)
- .withLimit(limit)
- .withWatermarkStrategy(watermarkStrategy)
-
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
+ new FlinkSourceBuilder(table)
+ .sourceName(tableIdentifier.asSummaryString())
+ .sourceBounded(!streaming)
+ .logSourceProvider(logSourceProvider)
+ .projection(projectFields)
+ .predicate(predicate)
+ .limit(limit)
+ .watermarkStrategy(watermarkStrategy)
+
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
return new PaimonDataStreamScanProvider(
!streaming,
env ->
sourceBuilder
- .withParallelism(inferSourceParallelism(env))
- .withEnv(env)
+ .sourceParallelism(inferSourceParallelism(env))
+ .env(env)
.build());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index d8878f360..6b542f71c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -24,6 +24,7 @@ import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.utils.TableScanUtils;
@@ -35,6 +36,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
@@ -45,33 +47,38 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
+import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
/**
- * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
- * ContinuousFileStoreSource}. This is for normal read/write jobs.
+ * DataStream API for building Flink Source.
+ *
+ * @since 0.8
*/
public class FlinkSourceBuilder {
- private final ObjectIdentifier tableIdentifier;
private final Table table;
private final Options conf;
- private boolean isContinuous = false;
+ private String sourceName;
+ private Boolean sourceBounded;
private StreamExecutionEnvironment env;
@Nullable private int[][] projectedFields;
@Nullable private Predicate predicate;
@@ -81,54 +88,61 @@ public class FlinkSourceBuilder {
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
@Nullable private DynamicPartitionFilteringInfo
dynamicPartitionFilteringInfo;
- public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
- this.tableIdentifier = tableIdentifier;
+ public FlinkSourceBuilder(Table table) {
this.table = table;
+ this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
}
- public FlinkSourceBuilder withContinuousMode(boolean isContinuous) {
- this.isContinuous = isContinuous;
+ public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
+ this.env = env;
+ if (sourceBounded == null) {
+ sourceBounded = !FlinkSink.isStreaming(env);
+ }
return this;
}
- public FlinkSourceBuilder withEnv(StreamExecutionEnvironment env) {
- this.env = env;
+ public FlinkSourceBuilder sourceName(String name) {
+ this.sourceName = name;
return this;
}
- public FlinkSourceBuilder withProjection(int[][] projectedFields) {
- this.projectedFields = projectedFields;
+ public FlinkSourceBuilder sourceBounded(boolean bounded) {
+ this.sourceBounded = bounded;
return this;
}
- public FlinkSourceBuilder withPredicate(Predicate predicate) {
- this.predicate = predicate;
+ public FlinkSourceBuilder projection(int[] projectedFields) {
+ return projection(Projection.of(projectedFields).toNestedIndexes());
+ }
+
+ public FlinkSourceBuilder projection(int[][] projectedFields) {
+ this.projectedFields = projectedFields;
return this;
}
- public FlinkSourceBuilder withLimit(@Nullable Long limit) {
- this.limit = limit;
+ public FlinkSourceBuilder predicate(Predicate predicate) {
+ this.predicate = predicate;
return this;
}
- public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider
logSourceProvider) {
- this.logSourceProvider = logSourceProvider;
+ public FlinkSourceBuilder limit(@Nullable Long limit) {
+ this.limit = limit;
return this;
}
- public FlinkSourceBuilder withParallelism(@Nullable Integer parallelism) {
+ public FlinkSourceBuilder sourceParallelism(@Nullable Integer parallelism)
{
this.parallelism = parallelism;
return this;
}
- public FlinkSourceBuilder withWatermarkStrategy(
+ public FlinkSourceBuilder watermarkStrategy(
@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
return this;
}
- public FlinkSourceBuilder withDynamicPartitionFilteringFields(
+ public FlinkSourceBuilder dynamicPartitionFilteringFields(
List<String> dynamicPartitionFilteringFields) {
if (dynamicPartitionFilteringFields != null &&
!dynamicPartitionFilteringFields.isEmpty()) {
checkState(
@@ -144,6 +158,12 @@ public class FlinkSourceBuilder {
return this;
}
+ @Deprecated
+ FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) {
+ this.logSourceProvider = logSourceProvider;
+ return this;
+ }
+
private ReadBuilder createReadBuilder() {
ReadBuilder readBuilder =
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
@@ -194,7 +214,7 @@ public class FlinkSourceBuilder {
watermarkStrategy == null
? WatermarkStrategy.noWatermarks()
: watermarkStrategy,
- tableIdentifier.asSummaryString(),
+ sourceName,
produceTypeInfo());
if (parallelism != null) {
dataStream.setParallelism(parallelism);
@@ -212,44 +232,58 @@ public class FlinkSourceBuilder {
return InternalTypeInfo.of(produceType);
}
+ /** Build source {@link DataStream} with {@link RowData}. */
+ public DataStream<Row> buildForRow() {
+ DataType rowType =
fromLogicalToDataType(toLogicalType(table.rowType()));
+ DataType[] fieldDataTypes = rowType.getChildren().toArray(new
DataType[0]);
+
+ DataFormatConverters.RowConverter converter =
+ new DataFormatConverters.RowConverter(fieldDataTypes);
+ DataStream<RowData> source = build();
+ return source.map((MapFunction<RowData, Row>) converter::toExternal)
+ .setParallelism(source.getParallelism())
+ .returns(ExternalTypeInfo.of(rowType));
+ }
+
+ /** Build source {@link DataStream} with {@link RowData}. */
public DataStream<RowData> build() {
if (env == null) {
throw new IllegalArgumentException("StreamExecutionEnvironment
should not be null.");
}
- if (isContinuous) {
- TableScanUtils.streamingReadingValidate(table);
-
- // TODO visit all options through CoreOptions
- StartupMode startupMode = CoreOptions.startupMode(conf);
- StreamingReadMode streamingReadMode =
CoreOptions.streamReadType(conf);
-
- if (logSourceProvider != null && streamingReadMode != FILE) {
- if (startupMode != StartupMode.LATEST_FULL) {
- return toDataStream(logSourceProvider.createSource(null));
- } else {
- return toDataStream(
- HybridSource.<RowData,
StaticFileStoreSplitEnumerator>builder(
-
LogHybridSourceFactory.buildHybridFirstSource(
- table, projectedFields,
predicate))
- .addSource(
- new
LogHybridSourceFactory(logSourceProvider),
- Boundedness.CONTINUOUS_UNBOUNDED)
- .build());
- }
+ if (sourceBounded) {
+ return buildStaticFileSource();
+ }
+
+ TableScanUtils.streamingReadingValidate(table);
+
+ // TODO visit all options through CoreOptions
+ StartupMode startupMode = CoreOptions.startupMode(conf);
+ StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);
+
+ if (logSourceProvider != null && streamingReadMode != FILE) {
+ if (startupMode != StartupMode.LATEST_FULL) {
+ return toDataStream(logSourceProvider.createSource(null));
} else {
- if
(conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
- return buildAlignedContinuousFileSource();
- } else if (conf.contains(CoreOptions.CONSUMER_ID)
- && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
- == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
- return buildContinuousStreamOperator();
- } else {
- return buildContinuousFileSource();
- }
+ return toDataStream(
+ HybridSource.<RowData,
StaticFileStoreSplitEnumerator>builder(
+
LogHybridSourceFactory.buildHybridFirstSource(
+ table, projectedFields,
predicate))
+ .addSource(
+ new
LogHybridSourceFactory(logSourceProvider),
+ Boundedness.CONTINUOUS_UNBOUNDED)
+ .build());
}
} else {
- return buildStaticFileSource();
+ if
(conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
+ return buildAlignedContinuousFileSource();
+ } else if (conf.contains(CoreOptions.CONSUMER_ID)
+ && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
+ == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
+ return buildContinuousStreamOperator();
+ } else {
+ return buildContinuousFileSource();
+ }
}
}
@@ -262,7 +296,7 @@ public class FlinkSourceBuilder {
dataStream =
MonitorFunction.buildSource(
env,
- tableIdentifier.asSummaryString(),
+ sourceName,
produceTypeInfo(),
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
@@ -276,7 +310,7 @@ public class FlinkSourceBuilder {
return dataStream;
}
- public void
assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
+ private void
assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkArgument(
checkpointConfig.isCheckpointingEnabled(),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 6b68532a2..dff589e92 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -35,12 +35,14 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.GenericRowData;
@@ -50,6 +52,7 @@ import
org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -135,17 +138,54 @@ public class FileStoreITCase extends AbstractTestBase {
return new SerializableRowData(row,
InternalSerializers.create(TABLE_TYPE));
}
+ @TestTemplate
+ public void testRowSourceSink() throws Exception {
+ FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[]
{1, 2});
+
+ // write
+ DataStreamSource<RowData> source = buildTestSource(env, isBatch);
+ DataStream<Row> input =
+ source.map(
+ (MapFunction<RowData, Row>)
+ r ->
+ Row.of(
+ r.getInt(0),
+
r.getString(1).toString(),
+ r.getInt(2)))
+ .setParallelism(source.getParallelism());
+ DataType inputType =
+ DataTypes.ROW(
+ DataTypes.FIELD("v", DataTypes.INT()),
+ DataTypes.FIELD("p", DataTypes.STRING()),
+ DataTypes.FIELD("_k", DataTypes.INT()));
+ new FlinkSinkBuilder(table).forRow(input, inputType).build();
+ env.execute();
+
+ // read
+ List<Row> results =
+ executeAndCollectRow(
+ new
FlinkSourceBuilder(table).env(env).sourceBounded(true).buildForRow());
+
+ // assert
+ Row[] expected =
+ new Row[] {
+ Row.of(5, "p2", 1), Row.of(3, "p2", 5), Row.of(5, "p1",
1), Row.of(0, "p1", 2)
+ };
+ assertThat(results).containsExactlyInAnyOrder(expected);
+ }
+
@TestTemplate
public void testPartitioned() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[]
{1, 2});
// write
- new FlinkSinkBuilder(table).withInput(buildTestSource(env,
isBatch)).build();
+ new FlinkSinkBuilder(table).forRowData(buildTestSource(env,
isBatch)).build();
env.execute();
// read
List<Row> results =
- executeAndCollect(new FlinkSourceBuilder(IDENTIFIER,
table).withEnv(env).build());
+ executeAndCollect(
+ new
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
// assert
Row[] expected =
@@ -160,12 +200,13 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});
// write
- new FlinkSinkBuilder(table).withInput(buildTestSource(env,
isBatch)).build();
+ new FlinkSinkBuilder(table).forRowData(buildTestSource(env,
isBatch)).build();
env.execute();
// read
List<Row> results =
- executeAndCollect(new FlinkSourceBuilder(IDENTIFIER,
table).withEnv(env).build());
+ executeAndCollect(
+ new
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
// assert
Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)};
@@ -179,7 +220,7 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[]
{1, 2});
// write
- new FlinkSinkBuilder(table).withInput(buildTestSource(env,
isBatch)).build();
+ new FlinkSinkBuilder(table).forRowData(buildTestSource(env,
isBatch)).build();
env.execute();
// overwrite p2
@@ -190,15 +231,13 @@ public class FileStoreITCase extends AbstractTestBase {
InternalTypeInfo.of(TABLE_TYPE));
Map<String, String> overwrite = new HashMap<>();
overwrite.put("p", "p2");
- new FlinkSinkBuilder(table)
- .withInput(partialData)
- .withOverwritePartition(overwrite)
- .build();
+ new
FlinkSinkBuilder(table).forRowData(partialData).overwrite(overwrite).build();
env.execute();
// read
List<Row> results =
- executeAndCollect(new FlinkSourceBuilder(IDENTIFIER,
table).withEnv(env).build());
+ executeAndCollect(
+ new
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1),
Row.of(0, "p1", 2)};
assertThat(results).containsExactlyInAnyOrder(expected);
@@ -209,14 +248,13 @@ public class FileStoreITCase extends AbstractTestBase {
Collections.singletonList(
wrap(GenericRowData.of(19,
StringData.fromString("p2"), 6))),
InternalTypeInfo.of(TABLE_TYPE));
- new FlinkSinkBuilder(table)
- .withInput(partialData)
- .withOverwritePartition(new HashMap<>())
- .build();
+ new
FlinkSinkBuilder(table).forRowData(partialData).overwrite().build();
env.execute();
// read
- results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER,
table).withEnv(env).build());
+ results =
+ executeAndCollect(
+ new
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1),
Row.of(0, "p1", 2)};
assertThat(results).containsExactlyInAnyOrder(expected);
@@ -230,13 +268,15 @@ public class FileStoreITCase extends AbstractTestBase {
table.copy(
Collections.singletonMap(
CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false")))
- .withInput(partialData)
- .withOverwritePartition(new HashMap<>())
+ .forRowData(partialData)
+ .overwrite()
.build();
env.execute();
// read
- results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER,
table).withEnv(env).build());
+ results =
+ executeAndCollect(
+ new
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
expected = new Row[] {Row.of(20, "p2", 3)};
assertThat(results).containsExactlyInAnyOrder(expected);
}
@@ -246,12 +286,13 @@ public class FileStoreITCase extends AbstractTestBase {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]);
// write
- new FlinkSinkBuilder(table).withInput(buildTestSource(env,
isBatch)).build();
+ new FlinkSinkBuilder(table).forRowData(buildTestSource(env,
isBatch)).build();
env.execute();
// read
List<Row> results =
- executeAndCollect(new FlinkSourceBuilder(IDENTIFIER,
table).withEnv(env).build());
+ executeAndCollect(
+ new
FlinkSourceBuilder(table).sourceBounded(true).env(env).build());
// assert
// in streaming mode, expect origin data X 2 (FiniteTestSource)
@@ -275,7 +316,7 @@ public class FileStoreITCase extends AbstractTestBase {
private void testProjection(FileStoreTable table) throws Exception {
// write
- new FlinkSinkBuilder(table).withInput(buildTestSource(env,
isBatch)).build();
+ new FlinkSinkBuilder(table).forRowData(buildTestSource(env,
isBatch)).build();
env.execute();
// read
@@ -288,9 +329,10 @@ public class FileStoreITCase extends AbstractTestBase {
projection.project(TABLE_TYPE)));
List<Row> results =
executeAndCollect(
- new FlinkSourceBuilder(IDENTIFIER, table)
- .withProjection(projection.toNestedIndexes())
- .withEnv(env)
+ new FlinkSourceBuilder(table)
+ .sourceBounded(true)
+ .projection(projection.toNestedIndexes())
+ .env(env)
.build(),
converter);
@@ -328,10 +370,7 @@ public class FileStoreITCase extends AbstractTestBase {
table.copy(
Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024"));
DataStream<RowData> source =
- new FlinkSourceBuilder(IDENTIFIER, table)
- .withContinuousMode(true)
- .withEnv(env)
- .build();
+ new
FlinkSourceBuilder(table).sourceBounded(false).env(env).build();
Transformation<RowData> transformation = source.getTransformation();
assertThat(transformation).isInstanceOf(SourceTransformation.class);
assertThat(((SourceTransformation<?, ?, ?>)
transformation).getSource().getBoundedness())
@@ -343,9 +382,9 @@ public class FileStoreITCase extends AbstractTestBase {
BlockingIterator<RowData, Row> iterator =
BlockingIterator.of(
- new FlinkSourceBuilder(IDENTIFIER, table)
- .withContinuousMode(true)
- .withEnv(env)
+ new FlinkSourceBuilder(table)
+ .sourceBounded(false)
+ .env(env)
.build()
.executeAndCollect(),
CONVERTER::toExternal);
@@ -382,7 +421,7 @@ public class FileStoreITCase extends AbstractTestBase {
}
DataStreamSource<RowData> source =
env.addSource(new FiniteTestSource<>(src, true),
InternalTypeInfo.of(TABLE_TYPE));
- new FlinkSinkBuilder(table).withInput(source).build();
+ new FlinkSinkBuilder(table).forRowData(source).build();
env.execute();
assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
}
@@ -468,4 +507,14 @@ public class FileStoreITCase extends AbstractTestBase {
iterator.close();
return results;
}
+
+ public static List<Row> executeAndCollectRow(DataStream<Row> source)
throws Exception {
+ CloseableIterator<Row> iterator = source.executeAndCollect();
+ List<Row> results = new ArrayList<>();
+ while (iterator.hasNext()) {
+ results.add(iterator.next());
+ }
+ iterator.close();
+ return results;
+ }
}