This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7ff1380d5 [flink] Replace FileStoreTable to public Table in flink
sources (#730)
7ff1380d5 is described below
commit 7ff1380d5b5332b145125139567e89632db47c33
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 29 13:40:53 2023 +0800
[flink] Replace FileStoreTable to public Table in flink sources (#730)
---
.../main/java/org/apache/paimon/CoreOptions.java | 4 +
.../paimon/table/source/BatchDataTableScan.java | 12 ---
.../paimon/table/source/StreamDataTableScan.java | 52 ----------
.../paimon/table/source/TableStreamingReader.java | 32 +++----
.../paimon/flink/AbstractFlinkTableFactory.java | 30 +++---
.../org/apache/paimon/flink/DataCatalogTable.java | 7 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 19 ++--
.../apache/paimon/flink/action/CompactAction.java | 6 ++
.../flink/lookup/FileStoreLookupFunction.java | 37 ++++---
.../apache/paimon/flink/sink/FlinkTableSink.java | 14 +--
.../flink/source/CompactorSourceBuilder.java | 37 +++----
.../source/ContinuousFileSplitEnumerator.java | 10 +-
.../flink/source/ContinuousFileStoreSource.java | 40 ++++----
.../paimon/flink/source/DataTableSource.java | 21 ++--
.../paimon/flink/source/FlinkSourceBuilder.java | 26 +++--
.../paimon/flink/source/FlinkTableSource.java | 2 +-
.../paimon/flink/source/SimpleSystemSource.java | 59 ------------
.../paimon/flink/source/StaticFileStoreSource.java | 55 ++++-------
.../source/StaticFileStoreSplitEnumerator.java | 2 +-
.../paimon/flink/source/SystemTableSource.java | 32 +++----
.../apache/paimon/flink/utils/TableScanUtils.java | 106 +++++++++++++++++++++
.../org/apache/paimon/flink/LookupJoinITCase.java | 5 +-
.../apache/paimon/flink/ReadWriteTableITCase.java | 4 +-
23 files changed, 273 insertions(+), 339 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 8feb424a9..149ca6f24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -544,6 +544,10 @@ public class CoreOptions implements Serializable {
this.options = options;
}
+ public static CoreOptions fromMap(Map<String, String> options) {
+ return new CoreOptions(options);
+ }
+
public Options toConfiguration() {
return options;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
index e2839253e..a25f2dfae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
@@ -20,12 +20,9 @@ package org.apache.paimon.table.source;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.Filter;
-import java.io.Serializable;
-
/** {@link DataTableScan} for batch planning. */
public interface BatchDataTableScan extends DataTableScan {
@@ -42,13 +39,4 @@ public interface BatchDataTableScan extends DataTableScan {
BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter);
BatchDataTableScan withStartingScanner(StartingScanner startingScanner);
-
- // ------------------------------------------------------------------------
- // factory interface
- // ------------------------------------------------------------------------
-
- /** Factory to create {@link BatchDataTableScan}. */
- interface Factory extends Serializable {
- BatchDataTableScan create(DataTable dataTable);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index 471c8428d..4d60c4402 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -18,21 +18,13 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.Filter;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
/** {@link DataTableScan} for streaming planning. */
public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan {
@@ -57,48 +49,4 @@ public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan
StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker);
StreamDataTableScan withSnapshotStarting();
-
- static void validate(TableSchema schema) {
- CoreOptions options = new CoreOptions(schema.options());
- CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
- new HashMap<CoreOptions.MergeEngine, String>() {
- {
- put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial
update");
- put(CoreOptions.MergeEngine.AGGREGATE,
"Pre-aggregate");
- }
- };
- if (schema.primaryKeys().size() > 0 &&
mergeEngineDesc.containsKey(mergeEngine)) {
- switch (options.changelogProducer()) {
- case NONE:
- case INPUT:
- throw new RuntimeException(
- mergeEngineDesc.get(mergeEngine)
- + " continuous reading is not supported.
You can use "
- + "'lookup' or 'full-compaction' changelog
producer to support streaming reading.");
- default:
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // factory interface
- // ------------------------------------------------------------------------
-
- /** Factory to create {@link StreamDataTableScan}. */
- interface Factory extends Serializable {
-
- StreamDataTableScan create(DataTable dataTable, @Nullable Long
nextSnapshotId);
- }
-
- /** A default {@link Factory} to create {@link StreamDataTableScan}. */
- class DefaultFactory implements Factory {
-
- @Override
- public StreamDataTableScan create(DataTable dataTable, @Nullable Long
nextSnapshotId) {
- StreamDataTableScan scan = dataTable.newStreamScan();
- scan.restore(nextSnapshotId);
- return scan;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
index 28c2d9fed..d1f2e1dff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
@@ -23,7 +23,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
import org.apache.paimon.utils.TypeUtils;
@@ -44,21 +44,16 @@ import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
/** A streaming reader to read table. */
public class TableStreamingReader {
- private final FileStoreTable table;
- private final int[] projection;
- @Nullable private final Predicate predicate;
+ private final ReadBuilder readBuilder;
@Nullable private final PredicateFilter recordFilter;
private final StreamDataTableScan scan;
- public TableStreamingReader(
- FileStoreTable table, int[] projection, @Nullable Predicate
predicate) {
- this.table = table;
- this.projection = projection;
- this.predicate = predicate;
+ public TableStreamingReader(Table table, int[] projection, @Nullable
Predicate predicate) {
+ this.readBuilder =
table.newReadBuilder().withProjection(projection).withFilter(predicate);
if (predicate != null) {
- List<String> fieldNames = table.schema().fieldNames();
- List<String> primaryKeys = table.schema().primaryKeys();
+ List<String> fieldNames = table.rowType().getFieldNames();
+ List<String> primaryKeys = table.primaryKeys();
// for pk table: only filter by pk, the stream is upsert instead
of changelog
// for non-pk table: filter all
@@ -71,20 +66,18 @@ public class TableStreamingReader {
};
int[] fieldIdxToProjectionIdx =
- IntStream.range(0,
table.schema().fields().size()).map(operator).toArray();
+ IntStream.range(0,
table.rowType().getFieldCount()).map(operator).toArray();
this.recordFilter =
new PredicateFilter(
- TypeUtils.project(table.schema().logicalRowType(),
projection),
+ TypeUtils.project(table.rowType(), projection),
transformFieldMapping(predicate,
fieldIdxToProjectionIdx).orElse(null));
} else {
recordFilter = null;
}
- scan = table.newStreamScan().withSnapshotStarting();
- if (predicate != null) {
- scan.withFilter(predicate);
- }
+ scan = (StreamDataTableScan) readBuilder.newStreamScan();
+ scan.withSnapshotStarting();
}
public Iterator<InternalRow> nextBatch() throws Exception {
@@ -97,10 +90,7 @@ public class TableStreamingReader {
}
private Iterator<InternalRow> read(DataFilePlan plan) throws IOException {
- InnerTableRead read = table.newRead().withProjection(projection);
- if (predicate != null) {
- read.withFilter(predicate);
- }
+ TableRead read = readBuilder.newRead();
List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
for (DataSplit split : plan.splits) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index c12a5b0b2..b7ea563ae 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -28,9 +28,8 @@ import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -70,15 +69,11 @@ public abstract class AbstractFlinkTableFactory
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
- int splitBatchSize =
- Options.fromMap(origin.getOptions())
-
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
- return new SystemTableSource(
- ((SystemCatalogTable) origin).table(), isStreamingMode,
splitBatchSize);
+ return new SystemTableSource(((SystemCatalogTable)
origin).table(), isStreamingMode);
} else {
return new DataTableSource(
context.getObjectIdentifier(),
- buildFileStoreTable(context),
+ buildPaimonTable(context),
isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
@@ -89,7 +84,7 @@ public abstract class AbstractFlinkTableFactory
public DynamicTableSink createDynamicTableSink(Context context) {
return new FlinkTableSink(
context.getObjectIdentifier(),
- buildFileStoreTable(context),
+ buildPaimonTable(context),
context,
createOptionalLogStoreFactory(context).orElse(null));
}
@@ -144,16 +139,15 @@ public abstract class AbstractFlinkTableFactory
context.getCatalogTable().getOptions(),
context.getConfiguration());
}
- static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context
context) {
+ static Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
- FileStoreTable table;
+ Table table;
if (origin instanceof DataCatalogTable) {
table = ((DataCatalogTable)
origin).table().copy(origin.getOptions());
} else {
table =
FileStoreTableFactory.create(createCatalogContext(context));
}
- TableSchema tableSchema = table.schema();
Schema schema =
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
RowType rowType = toLogicalType(schema.rowType());
@@ -162,24 +156,24 @@ public abstract class AbstractFlinkTableFactory
// compare fields to ignore the outside nullability and nested fields'
comments
Preconditions.checkArgument(
- schemaEquals(toLogicalType(tableSchema.logicalRowType()),
rowType),
+ schemaEquals(toLogicalType(table.rowType()), rowType),
"Flink schema and store schema are not the same, "
+ "store schema is %s, Flink schema is %s",
- tableSchema.logicalRowType(),
+ table.rowType(),
rowType);
Preconditions.checkArgument(
- tableSchema.partitionKeys().equals(partitionKeys),
+ table.partitionKeys().equals(partitionKeys),
"Flink partitionKeys and store partitionKeys are not the same,
"
+ "store partitionKeys is %s, Flink partitionKeys is
%s",
- tableSchema.partitionKeys(),
+ table.partitionKeys(),
partitionKeys);
Preconditions.checkArgument(
- tableSchema.primaryKeys().equals(primaryKeys),
+ table.primaryKeys().equals(primaryKeys),
"Flink primaryKeys and store primaryKeys are not the same, "
+ "store primaryKeys is %s, Flink primaryKeys is %s",
- tableSchema.primaryKeys(),
+ table.primaryKeys(),
primaryKeys);
return table;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
index 6b33ac501..1dd62cb2c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -33,10 +34,10 @@ import java.util.Map;
/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
public class DataCatalogTable extends CatalogTableImpl {
- private final FileStoreTable table;
+ private final Table table;
public DataCatalogTable(
- FileStoreTable table,
+ Table table,
TableSchema tableSchema,
List<String> partitionKeys,
Map<String, String> properties,
@@ -45,7 +46,7 @@ public class DataCatalogTable extends CatalogTableImpl {
this.table = table;
}
- public FileStoreTable table() {
+ public Table table() {
return table;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 618c4f735..01dc89d50 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -166,7 +166,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
if (table instanceof FileStoreTable) {
- return toCatalogTable((FileStoreTable) table);
+ return toCatalogTable(table);
} else {
return new SystemCatalogTable(table);
}
@@ -314,9 +314,9 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
- private CatalogTableImpl toCatalogTable(FileStoreTable table) {
+ private CatalogTableImpl toCatalogTable(Table table) {
TableSchema schema;
- Map<String, String> newOptions = new
HashMap<>(table.schema().options());
+ Map<String, String> newOptions = new HashMap<>(table.options());
// try to read schema from options
// in the case of virtual columns and watermark
@@ -334,23 +334,18 @@ public class FlinkCatalog extends AbstractCatalog {
removeProperties.asMap().keySet().forEach(newOptions::remove);
} else {
TableSchema.Builder builder = TableSchema.builder();
- for (RowType.RowField field :
-
toLogicalType(table.schema().logicalRowType()).getFields()) {
+ for (RowType.RowField field :
toLogicalType(table.rowType()).getFields()) {
builder.field(field.getName(),
fromLogicalToDataType(field.getType()));
}
- if (table.schema().primaryKeys().size() > 0) {
- builder.primaryKey(table.schema().primaryKeys().toArray(new
String[0]));
+ if (table.primaryKeys().size() > 0) {
+ builder.primaryKey(table.primaryKeys().toArray(new String[0]));
}
schema = builder.build();
}
return new DataCatalogTable(
- table,
- schema,
- table.schema().partitionKeys(),
- newOptions,
- table.schema().comment());
+ table, schema, table.partitionKeys(), newOptions,
table.comment().orElse(""));
}
public static Schema fromCatalogTable(CatalogTable catalogTable) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 5b7c329e1..d21d5e704 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -53,6 +53,12 @@ public class CompactAction extends ActionBase {
CompactAction(String warehouse, String database, String tableName) {
super(warehouse, database, tableName, new
Options().set(CoreOptions.WRITE_ONLY, false));
+ if (!(table instanceof FileStoreTable)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Only FileStoreTable supports compact action. The
table type is '%s'.",
+ table.getClass().getName()));
+ }
sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(),
(FileStoreTable) table);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 748850284..629f6effd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -22,12 +22,11 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.TableStreamingReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
@@ -70,7 +69,7 @@ public class FileStoreLookupFunction implements Serializable,
Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(FileStoreLookupFunction.class);
- private final FileStoreTable table;
+ private final Table table;
private final List<String> projectFields;
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
@@ -85,31 +84,27 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
private transient TableStreamingReader streamingReader;
public FileStoreLookupFunction(
- FileStoreTable table,
- int[] projection,
- int[] joinKeyIndex,
- @Nullable Predicate predicate) {
- TableSchema schema = table.schema();
+ Table table, int[] projection, int[] joinKeyIndex, @Nullable
Predicate predicate) {
checkArgument(
- schema.partitionKeys().isEmpty(), "Currently only support
non-partitioned table.");
- checkArgument(schema.primaryKeys().size() > 0, "Currently only support
primary key table.");
- StreamDataTableScan.validate(table.schema());
+ table.partitionKeys().isEmpty(), "Currently only support
non-partitioned table.");
+ checkArgument(table.primaryKeys().size() > 0, "Currently only support
primary key table.");
+ TableScanUtils.streamingReadingValidate(table);
this.table = table;
// join keys are based on projection fields
this.joinKeys =
Arrays.stream(joinKeyIndex)
- .mapToObj(i -> schema.fieldNames().get(projection[i]))
+ .mapToObj(i ->
table.rowType().getFieldNames().get(projection[i]))
.collect(Collectors.toList());
this.projectFields =
Arrays.stream(projection)
- .mapToObj(i -> schema.fieldNames().get(i))
+ .mapToObj(i -> table.rowType().getFieldNames().get(i))
.collect(Collectors.toList());
// add primary keys
- for (String field : schema.primaryKeys()) {
+ for (String field : table.primaryKeys()) {
if (!projectFields.contains(field)) {
projectFields.add(field);
}
@@ -122,20 +117,20 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
String tmpDirectory = getTmpDirectory(context);
this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
- Options options = Options.fromMap(table.schema().options());
+ Options options = Options.fromMap(table.options());
this.refreshInterval =
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
this.stateFactory = new RocksDBStateFactory(path.toString(), options);
- List<String> fieldNames =
table.schema().logicalRowType().getFieldNames();
+ List<String> fieldNames = table.rowType().getFieldNames();
int[] projection =
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
- RowType rowType = TypeUtils.project(table.schema().logicalRowType(),
projection);
+ RowType rowType = TypeUtils.project(table.rowType(), projection);
PredicateFilter recordFilter = createRecordFilter(projection);
this.lookupTable =
LookupTable.create(
stateFactory,
rowType,
- table.schema().primaryKeys(),
+ table.primaryKeys(),
joinKeys,
recordFilter,
options.get(LOOKUP_CACHE_ROWS));
@@ -153,13 +148,13 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
adjustedPredicate =
transformFieldMapping(
this.predicate,
- IntStream.range(0,
table.schema().fields().size())
+ IntStream.range(0,
table.rowType().getFieldCount())
.map(i -> Ints.indexOf(projection,
i))
.toArray())
.orElse(null);
}
return new PredicateFilter(
- TypeUtils.project(table.schema().logicalRowType(),
projection), adjustedPredicate);
+ TypeUtils.project(table.rowType(), projection),
adjustedPredicate);
}
public Collection<RowData> lookup(RowData keyRow) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 4d20118f8..6b983265e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -56,7 +57,7 @@ import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite,
SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
- private final FileStoreTable table;
+ private final Table table;
private final DynamicTableFactory.Context context;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
@@ -66,7 +67,7 @@ public class FlinkTableSink implements DynamicTableSink,
SupportsOverwrite, Supp
public FlinkTableSink(
ObjectIdentifier tableIdentifier,
- FileStoreTable table,
+ Table table,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this.tableIdentifier = tableIdentifier;
@@ -85,7 +86,7 @@ public class FlinkTableSink implements DynamicTableSink,
SupportsOverwrite, Supp
// no primary key, sink all changelogs
return requestedMode;
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
- Options options = Options.fromMap(table.schema().options());
+ Options options = Options.fromMap(table.options());
if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
return requestedMode;
}
@@ -124,13 +125,13 @@ public class FlinkTableSink implements DynamicTableSink,
SupportsOverwrite, Supp
logSinkProvider =
logStoreTableFactory.createSinkProvider(this.context, context);
}
- Options conf = Options.fromMap(table.schema().options());
+ Options conf = Options.fromMap(table.options());
// Do not sink to log store when overwrite mode
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null :
logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
(dataStream) ->
- new FlinkSinkBuilder(table)
+ new FlinkSinkBuilder((FileStoreTable) table)
.withInput(
new DataStream<>(
dataStream.getExecutionEnvironment(),
@@ -163,8 +164,7 @@ public class FlinkTableSink implements DynamicTableSink,
SupportsOverwrite, Supp
@Override
public void applyStaticPartition(Map<String, String> partition) {
- table.schema()
- .partitionKeys()
+ table.partitionKeys()
.forEach(
partitionKey -> {
if (partition.containsKey(partitionKey)) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 731d2d787..9761c4cbe 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -18,15 +18,13 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
-import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowType;
@@ -93,31 +91,22 @@ public class CompactorSourceBuilder {
.toArray(Predicate[]::new));
}
+ ReadBuilder readBuilder =
bucketsTable.newReadBuilder().withFilter(partitionPredicate);
if (isContinuous) {
return new ContinuousFileStoreSource(
- bucketsTable,
+ readBuilder,
+ bucketsTable.options(),
null,
- partitionPredicate,
- null,
- (table, nextSnapshotId) -> {
- StreamDataTableScan scan =
- table.newStreamScan()
- .withStartingScanner(
- new
ContinuousCompactorStartingScanner())
- .withFollowUpScanner(
- new
ContinuousCompactorFollowUpScanner())
-
.withBoundedChecker(BoundedChecker.neverEnd());
- scan.restore(nextSnapshotId);
- return scan;
- });
+ TableScanUtils.compactStreamScanFactory());
} else {
return new StaticFileStoreSource(
- bucketsTable,
- null,
- partitionPredicate,
+ readBuilder,
null,
- // static compactor source will compact all current files
- table -> table.newScan().withStartingScanner(new
FullStartingScanner()));
+ bucketsTable
+ .coreOptions()
+ .toConfiguration()
+
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+ TableScanUtils.compactBatchScanFactory());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 0eae09631..771c5b57a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -19,9 +19,9 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -62,7 +62,7 @@ public class ContinuousFileSplitEnumerator
private final FileStoreSourceSplitGenerator splitGenerator;
- private final StreamDataTableScan scan;
+ private final StreamTableScan scan;
@Nullable private Long nextSnapshotId;
@@ -73,7 +73,7 @@ public class ContinuousFileSplitEnumerator
Collection<FileStoreSourceSplit> remainSplits,
@Nullable Long nextSnapshotId,
long discoveryInterval,
- StreamDataTableScan scan) {
+ StreamTableScan scan) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.bucketSplits = new HashMap<>();
@@ -150,7 +150,7 @@ public class ContinuousFileSplitEnumerator
// ------------------------------------------------------------------------
- private void processDiscoveredSplits(DataFilePlan plan, Throwable error) {
+ private void processDiscoveredSplits(TableScan.Plan plan, Throwable error)
{
if (error != null) {
if (error instanceof EndOfScanException) {
// finished
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 909a736fe..09abb116d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -18,9 +18,10 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.Boundedness;
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
import static
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
@@ -40,28 +42,22 @@ public class ContinuousFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 3L;
- private final DataTable table;
- private final StreamDataTableScan.Factory scanFactory;
- private final Predicate predicate;
+ private final Map<String, String> options;
+ private final TableScanUtils.StreamTableScanFactory scanFactory;
public ContinuousFileStoreSource(
- DataTable table,
- @Nullable int[][] projectedFields,
- @Nullable Predicate predicate,
- @Nullable Long limit) {
- this(table, projectedFields, predicate, limit, new
StreamDataTableScan.DefaultFactory());
+ ReadBuilder readBuilder, Map<String, String> options, @Nullable
Long limit) {
+ this(readBuilder, options, limit,
TableScanUtils.defaultStreamScanFactory());
}
public ContinuousFileStoreSource(
- DataTable table,
- @Nullable int[][] projectedFields,
- @Nullable Predicate predicate,
+ ReadBuilder readBuilder,
+ Map<String, String> options,
@Nullable Long limit,
- StreamDataTableScan.Factory scanFactory) {
-
super(table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
limit);
- this.table = table;
+ TableScanUtils.StreamTableScanFactory scanFactory) {
+ super(readBuilder, limit);
+ this.options = options;
this.scanFactory = scanFactory;
- this.predicate = predicate;
}
@Override
@@ -84,19 +80,19 @@ public class ContinuousFileStoreSource extends FlinkSource {
context,
splits,
nextSnapshotId,
- table.coreOptions().continuousDiscoveryInterval().toMillis(),
- scanFactory.create(table,
nextSnapshotId).withFilter(predicate));
+
CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
+ scanFactory.create(readBuilder, nextSnapshotId));
}
@Override
public FileStoreSourceReader<?> createSourceReader(
SourceReaderContext context, TableRead read, @Nullable Long limit)
{
- return table.coreOptions().toConfiguration().get(STREAMING_READ_ATOMIC)
+ return Options.fromMap(options).get(STREAMING_READ_ATOMIC)
? new FileStoreSourceReader<>(RecordsFunction.forSingle(),
context, read, limit)
: new FileStoreSourceReader<>(RecordsFunction.forIterate(),
context, read, limit);
}
private boolean isBounded() {
- return table.coreOptions().scanBoundedWatermark() != null;
+ return CoreOptions.fromMap(options).scanBoundedWatermark() != null;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 121156dec..6529c8f04 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -33,7 +33,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -64,14 +64,14 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_
* Table source to create {@link StaticFileStoreSource} or {@link
ContinuousFileStoreSource} under
* batch mode or change-tracking is disabled. For streaming mode with
change-tracking enabled and
* FULL scan mode, it will create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link
StaticFileStoreSource} and
- * kafka log source created by {@link LogSourceProvider}.
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
+ * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created
by {@link
+ * LogSourceProvider}.
*/
public class DataTableSource extends FlinkTableSource
implements LookupTableSource, SupportsWatermarkPushDown {
private final ObjectIdentifier tableIdentifier;
- private final FileStoreTable table;
private final boolean streaming;
private final DynamicTableFactory.Context context;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
@@ -80,7 +80,7 @@ public class DataTableSource extends FlinkTableSource
public DataTableSource(
ObjectIdentifier tableIdentifier,
- FileStoreTable table,
+ Table table,
boolean streaming,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
@@ -98,7 +98,7 @@ public class DataTableSource extends FlinkTableSource
private DataTableSource(
ObjectIdentifier tableIdentifier,
- FileStoreTable table,
+ Table table,
boolean streaming,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory,
@@ -108,7 +108,6 @@ public class DataTableSource extends FlinkTableSource
@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
super(table, predicate, projectFields, limit);
this.tableIdentifier = tableIdentifier;
- this.table = table;
this.streaming = streaming;
this.context = context;
this.logStoreTableFactory = logStoreTableFactory;
@@ -130,7 +129,7 @@ public class DataTableSource extends FlinkTableSource
} else if (table instanceof ChangelogValueCountFileStoreTable) {
return ChangelogMode.all();
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
- Options options = Options.fromMap(table.schema().options());
+ Options options = Options.fromMap(table.options());
if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
return ChangelogMode.all();
@@ -164,7 +163,7 @@ public class DataTableSource extends FlinkTableSource
}
WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
- Options options = table.coreOptions().toConfiguration();
+ Options options = Options.fromMap(table.options());
if (watermarkStrategy != null) {
WatermarkEmitStrategy emitStrategy =
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
@@ -199,7 +198,7 @@ public class DataTableSource extends FlinkTableSource
.withPredicate(predicate)
.withLimit(limit)
.withParallelism(
- Options.fromMap(table.schema().options())
+ Options.fromMap(table.options())
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
.withWatermarkStrategy(watermarkStrategy);
@@ -239,7 +238,7 @@ public class DataTableSource extends FlinkTableSource
}
int[] projection =
projectFields == null
- ? IntStream.range(0,
table.schema().fields().size()).toArray()
+ ? IntStream.range(0,
table.rowType().getFieldCount()).toArray()
: Projection.of(projectFields).toTopLevelIndexes();
int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
return LookupRuntimeProviderFactory.create(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 0876926a5..7de7e3c2c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -20,12 +20,13 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.Table;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
@@ -52,7 +53,7 @@ import static
org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
public class FlinkSourceBuilder {
private final ObjectIdentifier tableIdentifier;
- private final FileStoreTable table;
+ private final Table table;
private final Options conf;
private boolean isContinuous = false;
@@ -64,10 +65,10 @@ public class FlinkSourceBuilder {
@Nullable private Long limit;
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
- public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, FileStoreTable
table) {
+ public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
this.tableIdentifier = tableIdentifier;
this.table = table;
- this.conf = Options.fromMap(table.schema().options());
+ this.conf = Options.fromMap(table.options());
}
public FlinkSourceBuilder withContinuousMode(boolean isContinuous) {
@@ -112,16 +113,23 @@ public class FlinkSourceBuilder {
}
private StaticFileStoreSource buildStaticFileSource() {
- return new StaticFileStoreSource(table, projectedFields, predicate,
limit);
+ return new StaticFileStoreSource(
+
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
+ limit,
+ Options.fromMap(table.options())
+
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
}
private ContinuousFileStoreSource buildContinuousFileSource() {
- return new ContinuousFileStoreSource(table, projectedFields,
predicate, limit);
+ return new ContinuousFileStoreSource(
+
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
+ table.options(),
+ limit);
}
public Source<RowData, ?, ?> buildSource() {
if (isContinuous) {
- StreamDataTableScan.validate(table.schema());
+ TableScanUtils.streamingReadingValidate(table);
// TODO visit all options through CoreOptions
StartupMode startupMode = CoreOptions.startupMode(conf);
@@ -149,7 +157,7 @@ public class FlinkSourceBuilder {
throw new IllegalArgumentException("StreamExecutionEnvironment
should not be null.");
}
- RowType rowType = toLogicalType(table.schema().logicalRowType());
+ RowType rowType = toLogicalType(table.rowType());
LogicalType produceType =
Optional.ofNullable(projectedFields)
.map(Projection::of)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 541c1a26c..143fe0039 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -43,7 +43,7 @@ public abstract class FlinkTableSource
SupportsProjectionPushDown,
SupportsLimitPushDown {
- private final Table table;
+ protected final Table table;
@Nullable protected Predicate predicate;
@Nullable protected int[][] projectFields;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
deleted file mode 100644
index 5fe27f25d..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source;
-
-import org.apache.paimon.table.source.ReadBuilder;
-
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-
-/** A {@link FlinkSource} for system table. */
-public class SimpleSystemSource extends FlinkSource {
-
- private static final long serialVersionUID = 2L;
- private final int splitBatchSize;
-
- public SimpleSystemSource(ReadBuilder readBuilder, @Nullable Long limit,
int splitBatchSize) {
- super(readBuilder, limit);
- this.splitBatchSize = splitBatchSize;
- }
-
- @Override
- public Boundedness getBoundedness() {
- return Boundedness.BOUNDED;
- }
-
- @Override
- public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>
restoreEnumerator(
- SplitEnumeratorContext<FileStoreSourceSplit> context,
- PendingSplitsCheckpoint checkpoint) {
- Collection<FileStoreSourceSplit> splits =
- checkpoint == null
- ? new FileStoreSourceSplitGenerator()
- .createSplits(readBuilder.newScan().plan())
- : checkpoint.splits();
-
- return new StaticFileStoreSplitEnumerator(context, null, splits,
splitBatchSize);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 3af455a67..219f2c13a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,11 +18,8 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -37,28 +34,22 @@ public class StaticFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 3L;
- private final DataTable table;
- private final BatchDataTableScan.Factory scanFactory;
- private final Predicate predicate;
+ private final int splitBatchSize;
+ private final TableScanUtils.TableScanFactory scanFactory;
public StaticFileStoreSource(
- DataTable table,
- @Nullable int[][] projectedFields,
- @Nullable Predicate predicate,
- @Nullable Long limit) {
- this(table, projectedFields, predicate, limit, DataTable::newScan);
+ ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize)
{
+ this(readBuilder, limit, splitBatchSize, ReadBuilder::newScan);
}
public StaticFileStoreSource(
- DataTable table,
- @Nullable int[][] projectedFields,
- @Nullable Predicate predicate,
+ ReadBuilder readBuilder,
@Nullable Long limit,
- BatchDataTableScan.Factory scanFactory) {
-
super(table.newReadBuilder().withFilter(predicate).withProjection(projectedFields),
limit);
- this.table = table;
+ int splitBatchSize,
+ TableScanUtils.TableScanFactory scanFactory) {
+ super(readBuilder, limit);
+ this.splitBatchSize = splitBatchSize;
this.scanFactory = scanFactory;
- this.predicate = predicate;
}
@Override
@@ -70,24 +61,12 @@ public class StaticFileStoreSource extends FlinkSource {
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>
restoreEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
- Collection<FileStoreSourceSplit> splits;
- if (checkpoint == null) {
- FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
- // read all splits from scan
- DataTableScan.DataFilePlan plan =
- scanFactory.create(table).withFilter(predicate).plan();
- splits = splitGenerator.createSplits(plan);
- } else {
- // restore from checkpoint
- splits = checkpoint.splits();
- }
+ Collection<FileStoreSourceSplit> splits =
+ checkpoint == null
+ ? new FileStoreSourceSplitGenerator()
+
.createSplits(scanFactory.create(readBuilder).plan())
+ : checkpoint.splits();
- return new StaticFileStoreSplitEnumerator(
- context,
- null,
- splits,
- table.coreOptions()
- .toConfiguration()
-
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
+ return new StaticFileStoreSplitEnumerator(context, null, splits,
splitBatchSize);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 89b1c0518..54361a622 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -91,7 +91,7 @@ public class StaticFileStoreSplitEnumerator
while (taskSplits != null && !taskSplits.isEmpty() &&
assignment.size() < splitBatchSize) {
assignment.add(taskSplits.poll());
}
- if (assignment != null && assignment.size() > 0) {
+ if (assignment.size() > 0) {
context.assignSplits(
new SplitsAssignment<>(Collections.singletonMap(subtask,
assignment)));
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e0a9a5502..a4d911540 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -18,9 +18,12 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.connector.ChangelogMode;
@@ -33,15 +36,15 @@ import javax.annotation.Nullable;
/** A {@link FlinkTableSource} for system table. */
public class SystemTableSource extends FlinkTableSource {
- private final Table table;
private final boolean isStreamingMode;
private final int splitBatchSize;
- public SystemTableSource(Table table, boolean isStreamingMode, int
splitBatchSize) {
+ public SystemTableSource(Table table, boolean isStreamingMode) {
super(table);
- this.table = table;
this.isStreamingMode = isStreamingMode;
- this.splitBatchSize = splitBatchSize;
+ this.splitBatchSize =
+ Options.fromMap(table.options())
+
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
}
public SystemTableSource(
@@ -52,7 +55,6 @@ public class SystemTableSource extends FlinkTableSource {
@Nullable Long limit,
int splitBatchSize) {
super(table, predicate, projectFields, limit);
- this.table = table;
this.isStreamingMode = isStreamingMode;
this.splitBatchSize = splitBatchSize;
}
@@ -65,21 +67,13 @@ public class SystemTableSource extends FlinkTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
Source<RowData, ?, ?> source;
- if (table instanceof DataTable) {
- DataTable dataTable = (DataTable) table;
- source =
- isStreamingMode
- ? new ContinuousFileStoreSource(
- dataTable, projectFields, predicate, limit)
- : new StaticFileStoreSource(dataTable,
projectFields, predicate, limit);
+ ReadBuilder readBuilder =
+
table.newReadBuilder().withProjection(projectFields).withFilter(predicate);
+
+ if (isStreamingMode && table instanceof DataTable) {
+ source = new ContinuousFileStoreSource(readBuilder,
table.options(), limit);
} else {
- source =
- new SimpleSystemSource(
- table.newReadBuilder()
- .withFilter(predicate)
- .withProjection(projectFields),
- limit,
- splitBatchSize);
+ source = new StaticFileStoreSource(readBuilder, limit,
splitBatchSize);
}
return SourceProvider.of(source);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
new file mode 100644
index 000000000..9ab22f00c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.BatchDataTableScan;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
+import
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
+import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
+import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+/** Utility methods for {@link TableScan}, such as validating and creating. */
+public class TableScanUtils {
+
+ public static void streamingReadingValidate(Table table) {
+ CoreOptions options = CoreOptions.fromMap(table.options());
+ CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+ HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
+ new HashMap<CoreOptions.MergeEngine, String>() {
+ {
+ put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial
update");
+ put(CoreOptions.MergeEngine.AGGREGATE,
"Pre-aggregate");
+ }
+ };
+ if (table.primaryKeys().size() > 0 &&
mergeEngineDesc.containsKey(mergeEngine)) {
+ switch (options.changelogProducer()) {
+ case NONE:
+ case INPUT:
+ throw new RuntimeException(
+ mergeEngineDesc.get(mergeEngine)
+ + " streaming reading is not supported.
You can use "
+ + "'lookup' or 'full-compaction' changelog
producer to support streaming reading.");
+ default:
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // TableScan factories
+ // ------------------------------------------------------------------------
+
+ /** Factory to create batch {@link TableScan}. */
+ public interface TableScanFactory extends Serializable {
+ TableScan create(ReadBuilder readBuilder);
+ }
+
+ /** Factory to create {@link StreamTableScan}. */
+ public interface StreamTableScanFactory extends Serializable {
+
+ StreamTableScan create(ReadBuilder readBuilder, @Nullable Long
nextSnapshotId);
+ }
+
+ public static StreamTableScanFactory defaultStreamScanFactory() {
+ return (builder, nextSnapshotId) -> {
+ StreamTableScan scan = builder.newStreamScan();
+ scan.restore(nextSnapshotId);
+ return scan;
+ };
+ }
+
+ public static TableScanFactory compactBatchScanFactory() {
+ return readBuilder -> {
+ BatchDataTableScan scan = (BatchDataTableScan)
readBuilder.newScan();
+ // static compactor source will compact all current files
+ scan.withStartingScanner(new FullStartingScanner());
+ return scan;
+ };
+ }
+
+ public static StreamTableScanFactory compactStreamScanFactory() {
+ return (readBuilder, nextSnapshotId) -> {
+ StreamDataTableScan scan = (StreamDataTableScan)
readBuilder.newStreamScan();
+ scan.withStartingScanner(new ContinuousCompactorStartingScanner())
+ .withFollowUpScanner(new
ContinuousCompactorFollowUpScanner())
+ .withBoundedChecker(BoundedChecker.neverEnd());
+ scan.restore(nextSnapshotId);
+ return scan;
+ };
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 1abb25c4e..38ad31d40 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -457,7 +457,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {
}
@Test
- public void testLookupPartialUpdateIllegal() throws Exception {
+ public void testLookupPartialUpdateIllegal() {
sql(
"CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1
INT, k2 INT) WITH"
+ "
('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')");
@@ -465,7 +465,8 @@ public class LookupJoinITCase extends CatalogITCaseBase {
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for
system_time as of T.proctime AS D ON T.i = D.i";
assertThatThrownBy(() -> sEnv.executeSql(query))
.hasRootCauseMessage(
- "Partial update continuous reading is not supported. "
+ "Partial update streaming"
+ + " reading is not supported. "
+ "You can use 'lookup' or 'full-compaction'
changelog producer to support streaming reading.");
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index b394a28cd..57d9fb118 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -57,7 +57,7 @@ import java.util.Map;
import java.util.UUID;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static
org.apache.paimon.flink.AbstractFlinkTableFactory.buildFileStoreTable;
+import static
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
@@ -1155,7 +1155,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
DynamicTableSink tableSink =
new FlinkTableSink(
- context.getObjectIdentifier(),
buildFileStoreTable(context), context, null);
+ context.getObjectIdentifier(),
buildPaimonTable(context), context, null);
assertThat(tableSink).isInstanceOf(FlinkTableSink.class);
// 2. get sink provider