This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b9527aba [FLINK-29736] Abstract a table interface for both data and
metadata tables
b9527aba is described below
commit b9527abac18a7c1f4eba13dadfb07227ba23f4b8
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 25 21:53:58 2022 +0800
[FLINK-29736] Abstract a table interface for both data and metadata tables
This closes #330
---
.../store/connector/sink/StoreCompactOperator.java | 8 +-
.../source/ContinuousFileSplitEnumerator.java | 4 +-
.../store/connector/source/FileStoreSource.java | 6 +-
.../connector/source/FileStoreSourceSplit.java | 10 +-
.../source/FileStoreSourceSplitGenerator.java | 9 +-
.../source/FileStoreSourceSplitSerializer.java | 5 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 10 +-
.../source/FileStoreSourceSplitSerializerTest.java | 4 +-
.../flink/table/store/file/catalog/Catalog.java | 6 +-
.../file/operation/AppendOnlyFileStoreRead.java | 4 +-
.../file/operation/AppendOnlyFileStoreWrite.java | 4 +-
.../table/store/file/operation/FileStoreRead.java | 4 +-
.../file/operation/KeyValueFileStoreRead.java | 9 +-
.../store/table/AppendOnlyFileStoreTable.java | 9 +-
.../table/ChangelogValueCountFileStoreTable.java | 6 +-
.../table/ChangelogWithKeyFileStoreTable.java | 6 +-
.../flink/table/store/table/FileStoreTable.java | 28 +++--
.../SupportsPartition.java} | 21 +---
.../SupportsWrite.java} | 22 ++--
.../table/{FileStoreTable.java => Table.java} | 26 +----
.../table/source/{Split.java => DataSplit.java} | 47 ++++++--
.../source/{TableScan.java => DataTableScan.java} | 46 ++++----
.../store/table/source/KeyValueTableRead.java | 2 +-
.../store/table/source/SnapshotEnumerator.java | 10 +-
.../flink/table/store/table/source/Split.java | 88 +-------------
.../flink/table/store/table/source/TableScan.java | 130 ++-------------------
.../store/table/source/TableStreamingReader.java | 8 +-
.../flink/table/store/file/TestFileStore.java | 4 +-
.../file/operation/KeyValueFileStoreReadTest.java | 4 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 19 +--
.../ChangelogValueCountFileStoreTableTest.java | 14 +--
.../table/ChangelogWithKeyFileStoreTableTest.java | 27 ++---
.../table/store/table/FileStoreTableTestBase.java | 15 ++-
.../table/store/table/SchemaEvolutionTest.java | 2 +-
.../table/store/table/WritePreemptMemoryTest.java | 2 +-
.../flink/table/store/table/source/SplitTest.java | 6 +-
.../table/store/mapred/TableStoreInputFormat.java | 4 +-
.../table/store/mapred/TableStoreInputSplit.java | 10 +-
.../store/mapred/TableStoreInputSplitTest.java | 4 +-
.../store/mapred/TableStoreRecordReaderTest.java | 4 +-
.../table/store/spark/SparkInputPartition.java | 18 +--
.../table/store/spark/SparkReaderFactory.java | 9 +-
.../apache/flink/table/store/spark/SparkScan.java | 22 ++--
.../flink/table/store/spark/SparkScanBuilder.java | 10 +-
.../flink/table/store/spark/SparkSource.java | 4 +-
.../apache/flink/table/store/spark/SparkTable.java | 26 +++--
.../table/store/spark/SparkDataSourceReader.java | 19 ++-
.../table/store/spark/SparkInputPartition.java | 25 +---
.../flink/table/store/spark/SparkSource.java | 5 +-
49 files changed, 289 insertions(+), 496 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index a703f3fd..e26b0a98 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -23,6 +23,7 @@ import
org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.types.logical.RowType;
@@ -75,9 +76,10 @@ public class StoreCompactOperator extends
PrepareCommitOperator {
int task = getRuntimeContext().getIndexOfThisSubtask();
int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
- for (Split split : scan.plan().splits) {
- BinaryRowData partition = split.partition();
- int bucket = split.bucket();
+ for (Split split : scan.plan().splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ BinaryRowData partition = dataSplit.partition();
+ int bucket = dataSplit.bucket();
if (Math.abs(Objects.hash(partition, bucket)) % numTask != task) {
continue;
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 255392b0..14e84dd2 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -23,9 +23,9 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import
org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
-import org.apache.flink.table.store.table.source.TableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class ContinuousFileSplitEnumerator
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Path location,
- TableScan scan,
+ DataTableScan scan,
CoreOptions.ChangelogProducer changelogProducer,
Collection<FileStoreSourceSplit> remainSplits,
long currentSnapshotId,
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 1df4028d..2aff07c2 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -29,8 +29,8 @@ import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
import javax.annotation.Nullable;
@@ -105,7 +105,7 @@ public class FileStoreSource
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
SnapshotManager snapshotManager = table.snapshotManager();
- TableScan scan = table.newScan();
+ DataTableScan scan = table.newScan();
if (predicate != null) {
scan.withFilter(predicate);
}
@@ -121,7 +121,7 @@ public class FileStoreSource
snapshotId = snapshotManager.latestSnapshotId();
splits = new ArrayList<>();
} else {
- TableScan.Plan plan = scan.plan();
+ DataTableScan.DataFilePlan plan = scan.plan();
snapshotId = plan.snapshotId;
splits = new
FileStoreSourceSplitGenerator().createSplits(plan);
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
index 4ff027ea..2356bab8 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import java.util.Objects;
@@ -29,21 +29,21 @@ public class FileStoreSourceSplit implements SourceSplit {
/** The unique ID of the split. Unique within the scope of this source. */
private final String id;
- private final Split split;
+ private final DataSplit split;
private final long recordsToSkip;
- public FileStoreSourceSplit(String id, Split split) {
+ public FileStoreSourceSplit(String id, DataSplit split) {
this(id, split, 0);
}
- public FileStoreSourceSplit(String id, Split split, long recordsToSkip) {
+ public FileStoreSourceSplit(String id, DataSplit split, long
recordsToSkip) {
this.id = id;
this.split = split;
this.recordsToSkip = recordsToSkip;
}
- public Split split() {
+ public DataSplit split() {
return split;
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index dc6b9cd3..5ffc6725 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -18,7 +18,8 @@
package org.apache.flink.table.store.connector.source;
-import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
import java.util.List;
import java.util.stream.Collectors;
@@ -35,9 +36,9 @@ public class FileStoreSourceSplitGenerator {
*/
private final char[] currentId = "0000000000".toCharArray();
- public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
- return plan.splits.stream()
- .map(s -> new FileStoreSourceSplit(getNextId(), s))
+ public List<FileStoreSourceSplit> createSplits(DataTableScan.Plan plan) {
+ return plan.splits().stream()
+ .map(s -> new FileStoreSourceSplit(getNextId(), (DataSplit) s))
.collect(Collectors.toList());
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 796373b6..4c0d6d2f 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -48,6 +48,7 @@ public class FileStoreSourceSplitSerializer
@Override
public FileStoreSourceSplit deserialize(int version, byte[] serialized)
throws IOException {
DataInputDeserializer view = new DataInputDeserializer(serialized);
- return new FileStoreSourceSplit(view.readUTF(),
Split.deserialize(view), view.readLong());
+ return new FileStoreSourceSplit(
+ view.readUTF(), DataSplit.deserialize(view), view.readLong());
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 955c5640..4e645045 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -75,10 +75,10 @@ public class FileStoreSourceSplitGeneratorTest {
makeEntry(6, 1, "f14"));
}
};
- List<Split> scanSplits =
- TableScan.generateSplits(
+ List<DataSplit> scanSplits =
+ DataTableScan.generateSplits(
false, Collections::singletonList,
plan.groupByPartFiles());
- TableScan.Plan tableScanPlan = new TableScan.Plan(1L, scanSplits);
+ DataTableScan.DataFilePlan tableScanPlan = new
DataTableScan.DataFilePlan(1L, scanSplits);
List<FileStoreSourceSplit> splits =
new
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 760cf426..3ecc9983 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.junit.jupiter.api.Test;
@@ -118,7 +118,7 @@ public class FileStoreSourceSplitSerializerTest {
boolean isIncremental,
long recordsToSkip) {
return new FileStoreSourceSplit(
- id, new Split(partition, bucket, files, isIncremental),
recordsToSkip);
+ id, new DataSplit(partition, bucket, files, isIncremental),
recordsToSkip);
}
private static FileStoreSourceSplit
serializeAndDeserialize(FileStoreSourceSplit split)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 2d3887c8..a84bb22f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.Table;
import java.util.List;
import java.util.Optional;
@@ -110,13 +110,13 @@ public interface Catalog extends AutoCloseable {
TableSchema getTableSchema(ObjectPath tablePath) throws
TableNotExistException;
/**
- * Return a {@link FileStoreTable} identified by the given {@link
ObjectPath}.
+ * Return a {@link Table} identified by the given {@link ObjectPath}.
*
* @param tablePath Path of the table
* @return The requested table
* @throws TableNotExistException if the target does not exist
*/
- default FileStoreTable getTable(ObjectPath tablePath) throws
TableNotExistException {
+ default Table getTable(ObjectPath tablePath) throws TableNotExistException
{
TableSchema tableSchema = getTableSchema(tablePath);
return FileStoreTableFactory.create(getTableLocation(tablePath),
tableSchema);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 29ea31be..cf1b35a9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.RowType;
@@ -82,7 +82,7 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<RowData> {
}
@Override
- public RecordReader<RowData> createReader(Split split) throws IOException {
+ public RecordReader<RowData> createReader(DataSplit split) throws
IOException {
BulkFormat<RowData, FileSourceSplit> readerFactory =
fileFormat.createReaderFactory(rowType, projection, filters);
DataFilePathFactory dataFilePathFactory =
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 627746c2..41271012 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -34,7 +34,7 @@ import
org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
@@ -139,7 +139,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<RowData> {
new
LongCounter(toCompact.get(0).minSequenceNumber()));
rewriter.write(
new RecordReaderIterator<>(
- read.createReader(new Split(partition, bucket,
toCompact, false))));
+ read.createReader(new DataSplit(partition, bucket,
toCompact, false))));
rewriter.close();
return rewriter.result();
};
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index f540f0cb..adb59879 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import java.io.IOException;
@@ -34,5 +34,5 @@ public interface FileStoreRead<T> {
FileStoreRead<T> withFilter(Predicate predicate);
/** Create a {@link RecordReader} from split. */
- RecordReader<T> createReader(Split split) throws IOException;
+ RecordReader<T> createReader(DataSplit split) throws IOException;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 63ebb3ed..6914fbcb 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -34,7 +34,7 @@ import
org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
@@ -125,7 +125,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
}
@Override
- public RecordReader<KeyValue> createReader(Split split) throws IOException
{
+ public RecordReader<KeyValue> createReader(DataSplit split) throws
IOException {
if (split.isIncremental()) {
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(
@@ -154,7 +154,8 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
}
}
- private List<ConcatRecordReader.ReaderSupplier<KeyValue>>
createSectionReaders(Split split) {
+ private List<ConcatRecordReader.ReaderSupplier<KeyValue>>
createSectionReaders(
+ DataSplit split) {
List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = new
ArrayList<>();
MergeFunction<KeyValue> mergeFunc = mergeFunction.copy();
for (List<SortedRun> section :
@@ -169,7 +170,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
return sectionReaders;
}
- private KeyValueFileReaderFactory createReaderFactory(Split split, boolean
overlapped) {
+ private KeyValueFileReaderFactory createReaderFactory(DataSplit split,
boolean overlapped) {
return readerFactoryBuilder.build(
split.partition(),
split.bucket(),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index e1eaf354..4d638fb7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -34,10 +34,11 @@ import
org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
@@ -63,9 +64,9 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableScan newScan() {
+ public DataTableScan newScan() {
AppendOnlyFileStoreScan scan = store.newScan();
- return new TableScan(scan, tableSchema, store.pathFactory()) {
+ return new DataTableScan(scan, tableSchema, store.pathFactory()) {
@Override
protected SplitGenerator splitGenerator(FileStorePathFactory
pathFactory) {
return new AppendOnlySplitGenerator(
@@ -97,7 +98,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public RecordReader<RowData> createReader(Split split) throws
IOException {
- return read.createReader(split);
+ return read.createReader((DataSplit) split);
}
};
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 5479dd1e..ebde8da4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -36,11 +36,11 @@ import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -74,9 +74,9 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableScan newScan() {
+ public DataTableScan newScan() {
KeyValueFileStoreScan scan = store.newScan();
- return new TableScan(scan, tableSchema, store.pathFactory()) {
+ return new DataTableScan(scan, tableSchema, store.pathFactory()) {
@Override
protected SplitGenerator splitGenerator(FileStorePathFactory
pathFactory) {
return new MergeTreeSplitGenerator(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 087f28af..1a0a8263 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -39,11 +39,11 @@ import
org.apache.flink.table.store.table.sink.SequenceGenerator;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.LogicalType;
@@ -127,9 +127,9 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableScan newScan() {
+ public DataTableScan newScan() {
KeyValueFileStoreScan scan = store.newScan();
- return new TableScan(scan, tableSchema, store.pathFactory()) {
+ return new DataTableScan(scan, tableSchema, store.pathFactory()) {
@Override
protected SplitGenerator splitGenerator(FileStorePathFactory
pathFactory) {
return new MergeTreeSplitGenerator(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 595a8524..d99019c3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -24,28 +24,42 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.types.logical.RowType;
-import java.io.Serializable;
+import java.util.List;
/**
* An abstraction layer above {@link
org.apache.flink.table.store.file.FileStore} to provide reading
* and writing of {@link org.apache.flink.table.data.RowData}.
*/
-public interface FileStoreTable extends Serializable {
+public interface FileStoreTable extends Table, SupportsPartition,
SupportsWrite {
CoreOptions options();
Path location();
+ @Override
+ default String name() {
+ return location().getName();
+ }
+
+ @Override
+ default RowType rowType() {
+ return schema().logicalRowType();
+ }
+
+ @Override
+ default List<String> partitionKeys() {
+ return schema().partitionKeys();
+ }
+
TableSchema schema();
SnapshotManager snapshotManager();
- TableScan newScan();
-
- TableRead newRead();
+ @Override
+ DataTableScan newScan();
TableWrite newWrite();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsPartition.java
similarity index 58%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsPartition.java
index f540f0cb..7d2944ff 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsPartition.java
@@ -16,23 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.table;
-import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.source.Split;
+import java.util.List;
-import java.io.IOException;
+/** An interface for {@link Table} partition support. */
+public interface SupportsPartition {
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface FileStoreRead<T> {
-
- FileStoreRead<T> withFilter(Predicate predicate);
-
- /** Create a {@link RecordReader} from split. */
- RecordReader<T> createReader(Split split) throws IOException;
+ List<String> partitionKeys();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
similarity index 58%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
index f540f0cb..97dfa5e5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
@@ -16,23 +16,15 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.table;
-import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
-import java.io.IOException;
+/** An interface for {@link Table} write support. */
+public interface SupportsWrite {
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface FileStoreRead<T> {
-
- FileStoreRead<T> withFilter(Predicate predicate);
+ TableWrite newWrite();
- /** Create a {@link RecordReader} from split. */
- RecordReader<T> createReader(Split split) throws IOException;
+ TableCommit newCommit(String user);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
similarity index 58%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
index 595a8524..f950aaf3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java
@@ -18,36 +18,20 @@
package org.apache.flink.table.store.table;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
-/**
- * An abstraction layer above {@link
org.apache.flink.table.store.file.FileStore} to provide reading
- * and writing of {@link org.apache.flink.table.data.RowData}.
- */
-public interface FileStoreTable extends Serializable {
-
- CoreOptions options();
-
- Path location();
+/** A table provides basic abstraction for table type and table scan and table
read. */
+public interface Table extends Serializable {
- TableSchema schema();
+ String name();
- SnapshotManager snapshotManager();
+ RowType rowType();
TableScan newScan();
TableRead newRead();
-
- TableWrite newWrite();
-
- TableCommit newCommit(String user);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
similarity index 69%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
index fd0e143e..8dfeac91 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataSplit.java
@@ -19,26 +19,37 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.SerializationUtils;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/** Input splits. Needed by most batch computation engines. */
-public class Split {
+public class DataSplit implements Split {
- private final BinaryRowData partition;
- private final int bucket;
- private final List<DataFileMeta> files;
- private final boolean isIncremental;
+ private static final long serialVersionUID = 1L;
- public Split(
+ private BinaryRowData partition;
+ private int bucket;
+ private List<DataFileMeta> files;
+ private boolean isIncremental;
+
+ public DataSplit(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files,
boolean isIncremental) {
+ init(partition, bucket, files, isIncremental);
+ }
+
+ private void init(
BinaryRowData partition, int bucket, List<DataFileMeta> files,
boolean isIncremental) {
this.partition = partition;
this.bucket = bucket;
@@ -62,6 +73,15 @@ public class Split {
return isIncremental;
}
+ @Override
+ public long rowCount() {
+ long rowCount = 0;
+ for (DataFileMeta file : files) {
+ rowCount += file.rowCount();
+ }
+ return rowCount;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -70,7 +90,7 @@ public class Split {
if (o == null || getClass() != o.getClass()) {
return false;
}
- Split split = (Split) o;
+ DataSplit split = (DataSplit) o;
return bucket == split.bucket
&& Objects.equals(partition, split.partition)
&& Objects.equals(files, split.files)
@@ -82,6 +102,15 @@ public class Split {
return Objects.hash(partition, bucket, files, isIncremental);
}
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ serialize(new DataOutputViewStreamWrapper(out));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ DataSplit split = DataSplit.deserialize(new
DataInputViewStreamWrapper(in));
+ init(split.partition, split.bucket, split.files, split.isIncremental);
+ }
+
public void serialize(DataOutputView out) throws IOException {
SerializationUtils.serializeBinaryRow(partition, out);
out.writeInt(bucket);
@@ -93,7 +122,7 @@ public class Split {
out.writeBoolean(isIncremental);
}
- public static Split deserialize(DataInputView in) throws IOException {
+ public static DataSplit deserialize(DataInputView in) throws IOException {
BinaryRowData partition = SerializationUtils.deserializeBinaryRow(in);
int bucket = in.readInt();
int fileNumber = in.readInt();
@@ -102,6 +131,6 @@ public class Split {
for (int i = 0; i < fileNumber; i++) {
files.add(dataFileSer.deserialize(in));
}
- return new Split(partition, bucket, files, in.readBoolean());
+ return new DataSplit(partition, bucket, files, in.readBoolean());
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
similarity index 81%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 64baa300..166c8096 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -37,7 +37,7 @@ import java.util.Optional;
import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
-public abstract class TableScan {
+public abstract class DataTableScan implements TableScan {
private final FileStoreScan scan;
private final TableSchema tableSchema;
@@ -45,26 +45,20 @@ public abstract class TableScan {
private boolean isIncremental = false;
- protected TableScan(
+ protected DataTableScan(
FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory
pathFactory) {
this.scan = scan;
this.tableSchema = tableSchema;
this.pathFactory = pathFactory;
}
- public TableScan withSnapshot(long snapshotId) {
+ public DataTableScan withSnapshot(long snapshotId) {
scan.withSnapshot(snapshotId);
return this;
}
- public TableScan withFilter(List<Predicate> predicates) {
- if (predicates == null || predicates.isEmpty()) {
- return this;
- }
- return withFilter(PredicateBuilder.and(predicates));
- }
-
- public TableScan withFilter(Predicate predicate) {
+ @Override
+ public DataTableScan withFilter(Predicate predicate) {
List<String> partitionKeys = tableSchema.partitionKeys();
int[] fieldIdxToPartitionIdx =
tableSchema.fields().stream()
@@ -91,34 +85,35 @@ public abstract class TableScan {
return this;
}
- public TableScan withIncremental(boolean isIncremental) {
+ public DataTableScan withIncremental(boolean isIncremental) {
this.isIncremental = isIncremental;
scan.withIncremental(isIncremental);
return this;
}
@VisibleForTesting
- public TableScan withBucket(int bucket) {
+ public DataTableScan withBucket(int bucket) {
scan.withBucket(bucket);
return this;
}
- public Plan plan() {
+ @Override
+ public DataFilePlan plan() {
FileStoreScan.Plan plan = scan.plan();
- return new Plan(plan.snapshotId(),
generateSplits(plan.groupByPartFiles()));
+ return new DataFilePlan(plan.snapshotId(),
generateSplits(plan.groupByPartFiles()));
}
- private List<Split> generateSplits(
+ private List<DataSplit> generateSplits(
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
return generateSplits(isIncremental, splitGenerator(pathFactory),
groupedDataFiles);
}
@VisibleForTesting
- public static List<Split> generateSplits(
+ public static List<DataSplit> generateSplits(
boolean isIncremental,
SplitGenerator splitGenerator,
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
- List<Split> splits = new ArrayList<>();
+ List<DataSplit> splits = new ArrayList<>();
for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
groupedDataFiles.entrySet()) {
BinaryRowData partition = entry.getKey();
@@ -127,10 +122,10 @@ public abstract class TableScan {
int bucket = bucketEntry.getKey();
if (isIncremental) {
// Don't split when incremental
- splits.add(new Split(partition, bucket,
bucketEntry.getValue(), true));
+ splits.add(new DataSplit(partition, bucket,
bucketEntry.getValue(), true));
} else {
splitGenerator.split(bucketEntry.getValue()).stream()
- .map(files -> new Split(partition, bucket, files,
false))
+ .map(files -> new DataSplit(partition, bucket,
files, false))
.forEach(splits::add);
}
}
@@ -143,15 +138,20 @@ public abstract class TableScan {
protected abstract void withNonPartitionFilter(Predicate predicate);
/** Scanning plan containing snapshot ID and input splits. */
- public static class Plan {
+ public static class DataFilePlan implements Plan {
@Nullable public final Long snapshotId;
- public final List<Split> splits;
+ public final List<DataSplit> splits;
@VisibleForTesting
- public Plan(@Nullable Long snapshotId, List<Split> splits) {
+ public DataFilePlan(@Nullable Long snapshotId, List<DataSplit> splits)
{
this.snapshotId = snapshotId;
this.splits = splits;
}
+
+ @Override
+ public List<Split> splits() {
+ return (List) splits;
+ }
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
index c3939a4d..8a8465c4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
@@ -40,7 +40,7 @@ public abstract class KeyValueTableRead implements TableRead {
@Override
public RecordReader<RowData> createReader(Split split) throws IOException {
- return new RowDataRecordReader(read.createReader(split));
+ return new RowDataRecordReader(read.createReader((DataSplit) split));
}
protected abstract RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
index 40ebf1ee..edbaa1bc 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -36,14 +36,14 @@ public class SnapshotEnumerator implements
Callable<SnapshotEnumerator.Enumerato
private static final Logger LOG =
LoggerFactory.getLogger(SnapshotEnumerator.class);
private final SnapshotManager snapshotManager;
- private final TableScan scan;
+ private final DataTableScan scan;
private final CoreOptions.ChangelogProducer changelogProducer;
private long nextSnapshotId;
public SnapshotEnumerator(
Path tablePath,
- TableScan scan,
+ DataTableScan scan,
CoreOptions.ChangelogProducer changelogProducer,
long currentSnapshot) {
this.snapshotManager = new SnapshotManager(tablePath);
@@ -85,7 +85,7 @@ public class SnapshotEnumerator implements
Callable<SnapshotEnumerator.Enumerato
continue;
}
- TableScan.Plan plan = scan.withSnapshot(nextSnapshotId).plan();
+ DataTableScan.DataFilePlan plan =
scan.withSnapshot(nextSnapshotId).plan();
EnumeratorResult result = new EnumeratorResult(nextSnapshotId,
plan);
LOG.debug("Find snapshot id {}.", nextSnapshotId);
@@ -99,9 +99,9 @@ public class SnapshotEnumerator implements
Callable<SnapshotEnumerator.Enumerato
public final long snapshotId;
- public final TableScan.Plan plan;
+ public final DataTableScan.DataFilePlan plan;
- private EnumeratorResult(long snapshotId, TableScan.Plan plan) {
+ private EnumeratorResult(long snapshotId, DataTableScan.DataFilePlan
plan) {
this.snapshotId = snapshotId;
this.plan = plan;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
index fd0e143e..978756d9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
@@ -18,90 +18,10 @@
package org.apache.flink.table.store.table.source;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
-import org.apache.flink.table.store.file.utils.SerializationUtils;
+import java.io.Serializable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+/** An input split for reading. */
+public interface Split extends Serializable {
-/** Input splits. Needed by most batch computation engines. */
-public class Split {
-
- private final BinaryRowData partition;
- private final int bucket;
- private final List<DataFileMeta> files;
- private final boolean isIncremental;
-
- public Split(
- BinaryRowData partition, int bucket, List<DataFileMeta> files,
boolean isIncremental) {
- this.partition = partition;
- this.bucket = bucket;
- this.files = files;
- this.isIncremental = isIncremental;
- }
-
- public BinaryRowData partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public List<DataFileMeta> files() {
- return files;
- }
-
- public boolean isIncremental() {
- return isIncremental;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Split split = (Split) o;
- return bucket == split.bucket
- && Objects.equals(partition, split.partition)
- && Objects.equals(files, split.files)
- && isIncremental == split.isIncremental;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partition, bucket, files, isIncremental);
- }
-
- public void serialize(DataOutputView out) throws IOException {
- SerializationUtils.serializeBinaryRow(partition, out);
- out.writeInt(bucket);
- out.writeInt(files.size());
- DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
- for (DataFileMeta file : files) {
- dataFileSer.serialize(file, out);
- }
- out.writeBoolean(isIncremental);
- }
-
- public static Split deserialize(DataInputView in) throws IOException {
- BinaryRowData partition = SerializationUtils.deserializeBinaryRow(in);
- int bucket = in.readInt();
- int fileNumber = in.readInt();
- List<DataFileMeta> files = new ArrayList<>(fileNumber);
- DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
- for (int i = 0; i < fileNumber; i++) {
- files.add(dataFileSer.deserialize(in));
- }
- return new Split(partition, bucket, files, in.readBoolean());
- }
+ long rowCount();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 64baa300..68111fc2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -18,140 +18,28 @@
package org.apache.flink.table.store.table.source;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.table.Table;
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
-
-/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
-public abstract class TableScan {
-
- private final FileStoreScan scan;
- private final TableSchema tableSchema;
- private final FileStorePathFactory pathFactory;
- private boolean isIncremental = false;
-
- protected TableScan(
- FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory
pathFactory) {
- this.scan = scan;
- this.tableSchema = tableSchema;
- this.pathFactory = pathFactory;
- }
-
- public TableScan withSnapshot(long snapshotId) {
- scan.withSnapshot(snapshotId);
- return this;
- }
+/** A scan of {@link Table} to generate {@link Split} splits. */
+public interface TableScan {
- public TableScan withFilter(List<Predicate> predicates) {
+ default TableScan withFilter(List<Predicate> predicates) {
if (predicates == null || predicates.isEmpty()) {
return this;
}
return withFilter(PredicateBuilder.and(predicates));
}
- public TableScan withFilter(Predicate predicate) {
- List<String> partitionKeys = tableSchema.partitionKeys();
- int[] fieldIdxToPartitionIdx =
- tableSchema.fields().stream()
- .mapToInt(f -> partitionKeys.indexOf(f.name()))
- .toArray();
+ TableScan withFilter(Predicate predicate);
- List<Predicate> partitionFilters = new ArrayList<>();
- List<Predicate> nonPartitionFilters = new ArrayList<>();
- for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
- Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxToPartitionIdx);
- if (mapped.isPresent()) {
- partitionFilters.add(mapped.get());
- } else {
- nonPartitionFilters.add(p);
- }
- }
-
- if (partitionFilters.size() > 0) {
- scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
- }
- if (nonPartitionFilters.size() > 0) {
- withNonPartitionFilter(PredicateBuilder.and(nonPartitionFilters));
- }
- return this;
- }
-
- public TableScan withIncremental(boolean isIncremental) {
- this.isIncremental = isIncremental;
- scan.withIncremental(isIncremental);
- return this;
- }
+ Plan plan();
- @VisibleForTesting
- public TableScan withBucket(int bucket) {
- scan.withBucket(bucket);
- return this;
- }
-
- public Plan plan() {
- FileStoreScan.Plan plan = scan.plan();
- return new Plan(plan.snapshotId(),
generateSplits(plan.groupByPartFiles()));
- }
-
- private List<Split> generateSplits(
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
- return generateSplits(isIncremental, splitGenerator(pathFactory),
groupedDataFiles);
- }
-
- @VisibleForTesting
- public static List<Split> generateSplits(
- boolean isIncremental,
- SplitGenerator splitGenerator,
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
- List<Split> splits = new ArrayList<>();
- for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
- groupedDataFiles.entrySet()) {
- BinaryRowData partition = entry.getKey();
- Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
- for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
- int bucket = bucketEntry.getKey();
- if (isIncremental) {
- // Don't split when incremental
- splits.add(new Split(partition, bucket,
bucketEntry.getValue(), true));
- } else {
- splitGenerator.split(bucketEntry.getValue()).stream()
- .map(files -> new Split(partition, bucket, files,
false))
- .forEach(splits::add);
- }
- }
- }
- return splits;
- }
-
- protected abstract SplitGenerator splitGenerator(FileStorePathFactory
pathFactory);
-
- protected abstract void withNonPartitionFilter(Predicate predicate);
-
- /** Scanning plan containing snapshot ID and input splits. */
- public static class Plan {
-
- @Nullable public final Long snapshotId;
- public final List<Split> splits;
-
- @VisibleForTesting
- public Plan(@Nullable Long snapshotId, List<Split> splits) {
- this.snapshotId = snapshotId;
- this.splits = splits;
- }
+ /** Plan of scan. */
+ interface Plan {
+ List<Split> splits();
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index 49fc4618..a00bed89 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -84,11 +84,11 @@ public class TableStreamingReader {
@Nullable
public Iterator<RowData> nextBatch() throws IOException {
if (enumerator == null) {
- TableScan scan = table.newScan();
+ DataTableScan scan = table.newScan();
if (predicate != null) {
scan.withFilter(predicate);
}
- TableScan.Plan plan = scan.plan();
+ DataTableScan.DataFilePlan plan = scan.plan();
if (plan.snapshotId == null) {
return null;
}
@@ -109,14 +109,14 @@ public class TableStreamingReader {
}
}
- private Iterator<RowData> read(TableScan.Plan plan) throws IOException {
+ private Iterator<RowData> read(DataTableScan.DataFilePlan plan) throws
IOException {
TableRead read = table.newRead().withProjection(projection);
if (predicate != null) {
read.withFilter(predicate);
}
List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new
ArrayList<>();
- for (Split split : plan.splits) {
+ for (DataSplit split : plan.splits) {
readers.add(() -> read.createReader(split));
}
Iterator<RowData> iterator = new
RecordReaderIterator<>(ConcatRecordReader.create(readers));
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index e55d225a..0e471d9f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -44,7 +44,7 @@ import
org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.FileCommittable;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
@@ -297,7 +297,7 @@ public class TestFileStore extends KeyValueFileStore {
RecordReaderIterator<KeyValue> iterator =
new RecordReaderIterator<>(
read.createReader(
- new Split(
+ new DataSplit(
entryWithPartition.getKey(),
entryWithBucket.getKey(),
entryWithBucket.getValue(),
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index 3f32e168..1dcc2abe 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -208,7 +208,7 @@ public class KeyValueFileStoreReadTest {
filesGroupedByPartition.entrySet()) {
RecordReader<KeyValue> reader =
read.createReader(
- new Split(
+ new DataSplit(
entry.getKey(),
0,
entry.getValue().stream()
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 072eec88..79296b4a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -58,7 +59,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.hasSameElementsAs(
@@ -82,7 +83,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_PROJECTED_ROW_TO_STRING))
.hasSameElementsAs(Arrays.asList("100|10", "101|11", "102|12",
"101|11", "102|12"));
@@ -97,7 +98,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
Predicate predicate = builder.equal(2, 201L);
- List<Split> splits =
table.newScan().withFilter(predicate).plan().splits;
+ List<Split> splits =
table.newScan().withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
@@ -114,7 +115,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -128,7 +129,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("+101|11", "+102|12"));
@@ -144,7 +145,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate predicate = builder.equal(2, 101L);
List<Split> splits =
-
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -197,7 +198,11 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate partitionFilter =
new PredicateBuilder(table.schema().logicalRowType()).equal(0,
partition);
List<Split> splits =
-
table.newScan().withFilter(partitionFilter).withBucket(bucket).plan().splits;
+ ((DataTableScan) table.newScan())
+ .withFilter(partitionFilter)
+ .withBucket(bucket)
+ .plan()
+ .splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(partition), bucket,
STREAMING_ROW_TO_STRING))
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 21aaa6ac..3b4806b8 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -51,7 +51,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.isEqualTo(
@@ -72,7 +72,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("101|11", "101|11", "102|12"));
@@ -87,7 +87,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
Predicate predicate = builder.equal(2, 201L);
- List<Split> splits =
table.newScan().withFilter(predicate).plan().splits;
+ List<Split> splits =
table.newScan().withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
@@ -103,7 +103,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -121,7 +121,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("-100|10", "+101|11"));
@@ -137,7 +137,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
Predicate predicate = builder.equal(2, 201L);
List<Split> splits =
-
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -188,7 +188,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
write.close();
// check that no data file is produced
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
assertThat(splits).isEmpty();
// check that no changelog file is produced
Path bucketPath = DataFilePathFactory.bucketPath(table.location(),
"1", 0);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 5cb9659c..b0d96233 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -62,7 +63,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.commit("1", write.prepareCommit(true));
write.close();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.isEqualTo(Arrays.asList("1|10|200|binary|varbinary",
"1|11|101|binary|varbinary"));
@@ -73,7 +74,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary"));
@@ -87,7 +88,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_PROJECTED_ROW_TO_STRING))
.isEqualTo(Collections.singletonList("1000|10"));
@@ -102,7 +103,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(1, 21));
- List<Split> splits =
table.newScan().withFilter(predicate).plan().splits;
+ List<Split> splits =
table.newScan().withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
@@ -118,7 +119,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary"));
@@ -135,7 +136,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
writeData();
FileStoreTable table = createFileStoreTable();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -152,7 +153,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(1, 21));
List<Split> splits =
-
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+
table.newScan().withIncremental(true).withFilter(predicate).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -183,7 +184,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.commit("0", write.prepareCommit(true));
write.close();
- List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
CHANGELOG_ROW_TO_STRING))
.containsExactlyInAnyOrder(
@@ -251,7 +252,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
SnapshotEnumerator enumerator =
new SnapshotEnumerator(
tablePath,
- table.newScan().withIncremental(true),
+ (DataTableScan) table.newScan().withIncremental(true),
ChangelogProducer.INPUT,
Snapshot.FIRST_SNAPSHOT_ID - 1);
@@ -259,7 +260,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
SnapshotEnumerator.EnumeratorResult result = enumerator.call();
assertThat(result).isNotNull();
- List<Split> splits = result.plan.splits;
+ List<Split> splits = result.plan.splits();
TableRead read = table.newRead();
for (int j = 0; j < 2; j++) {
assertThat(getResult(read, splits, binaryRow(j + 1), 0,
CHANGELOG_ROW_TO_STRING))
@@ -317,7 +318,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.commit("2", write.prepareCommit(true));
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
// push down key filter a = 30
TableRead read = table.newRead().withFilter(builder.equal(1, 30));
@@ -352,7 +353,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.close();
// cannot push down value filter b = 600L
- splits = table.newScan().plan().splits;
+ splits = table.newScan().plan().splits();
read = table.newRead().withFilter(builder.equal(2, 600L));
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.hasSameElementsAs(
@@ -384,7 +385,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.commit("0", write.prepareCommit(true));
write.close();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.isEqualTo(Collections.singletonList("1|10|200|binary|varbinary"));
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 152b0c27..94ed642c 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -39,6 +39,7 @@ import
org.apache.flink.table.store.file.utils.TraceableFileSystem;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.types.logical.LogicalType;
@@ -147,7 +148,7 @@ public abstract class FileStoreTableTestBase {
commit.withOverwritePartition(overwritePartition).commit("1",
write.prepareCommit(true));
write.close();
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.hasSameElementsAs(Collections.singletonList("1|10|100|binary|varbinary"));
@@ -177,9 +178,9 @@ public abstract class FileStoreTableTestBase {
table.newScan()
.withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5))
.plan()
- .splits;
+ .splits();
assertThat(splits.size()).isEqualTo(1);
- assertThat(splits.get(0).bucket()).isEqualTo(1);
+ assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
}
@Test
@@ -204,7 +205,7 @@ public abstract class FileStoreTableTestBase {
write.close();
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
.hasSameElementsAs(
@@ -278,7 +279,8 @@ public abstract class FileStoreTableTestBase {
write.close();
List<DataFileMeta> files =
- table.newScan().plan().splits.stream()
+ table.newScan().plan().splits().stream()
+ .map(split -> (DataSplit) split)
.flatMap(split -> split.files().stream())
.collect(Collectors.toList());
for (DataFileMeta file : files) {
@@ -319,7 +321,8 @@ public abstract class FileStoreTableTestBase {
private List<Split> getSplitsFor(List<Split> splits, BinaryRowData
partition, int bucket) {
List<Split> result = new ArrayList<>();
for (Split split : splits) {
- if (split.partition().equals(partition) && split.bucket() ==
bucket) {
+ DataSplit dataSplit = (DataSplit) split;
+ if (dataSplit.partition().equals(partition) && dataSplit.bucket()
== bucket) {
result.add(split);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index d52df819..025ecb93 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -161,7 +161,7 @@ public class SchemaEvolutionTest {
if (filter != null) {
scan.withFilter(filter);
}
- for (Split split : scan.plan().splits) {
+ for (Split split : scan.plan().splits()) {
TableRead read = table.newRead();
if (filter != null) {
read.withFilter(filter);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 353ac24e..b6a09c4a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -77,7 +77,7 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
write.close();
// read
- List<Split> splits = table.newScan().plan().splits;
+ List<Split> splits = table.newScan().plan().splits();
TableRead read = table.newRead();
List<String> results = new ArrayList<>();
for (int i = 0; i < 5; i++) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
index 0ac385e4..d2cbb40f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link Split}. */
+/** Test for {@link DataSplit}. */
public class SplitTest {
@Test
@@ -44,12 +44,12 @@ public class SplitTest {
for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
files.add(gen.next().meta);
}
- Split split = new Split(data.partition, data.bucket, files, false);
+ DataSplit split = new DataSplit(data.partition, data.bucket, files,
false);
ByteArrayOutputStream out = new ByteArrayOutputStream();
split.serialize(new DataOutputViewStreamWrapper(out));
- Split newSplit = Split.deserialize(new
DataInputDeserializer(out.toByteArray()));
+ DataSplit newSplit = DataSplit.deserialize(new
DataInputDeserializer(out.toByteArray()));
assertThat(newSplit).isEqualTo(split);
}
}
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index f35b29e6..34cd506c 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -27,8 +27,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -52,7 +52,7 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
FileStoreTable table = createFileStoreTable(jobConf);
- TableScan scan = table.newScan();
+ DataTableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
return scan.plan().splits.stream()
.map(split -> new
TableStoreInputSplit(table.location().toString(), split))
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index eeb3fc1e..ad5f36c2 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.mapred;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -40,17 +40,17 @@ public class TableStoreInputSplit extends FileSplit {
private static final String[] ANYWHERE = new String[] {"*"};
private String path;
- private Split split;
+ private DataSplit split;
// public no-argument constructor for deserialization
public TableStoreInputSplit() {}
- public TableStoreInputSplit(String path, Split split) {
+ public TableStoreInputSplit(String path, DataSplit split) {
this.path = path;
this.split = split;
}
- public Split split() {
+ public DataSplit split() {
return split;
}
@@ -89,7 +89,7 @@ public class TableStoreInputSplit extends FileSplit {
int length = dataInput.readInt();
byte[] bytes = new byte[length];
dataInput.readFully(bytes);
- split = Split.deserialize(new DataInputDeserializer(bytes));
+ split = DataSplit.deserialize(new DataInputDeserializer(bytes));
}
@Override
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
index 962fb63f..ef3c3042 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.mapred;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.io.DataFileTestDataGenerator;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -54,7 +54,7 @@ public class TableStoreInputSplitTest {
TableStoreInputSplit split =
new TableStoreInputSplit(
tempDir.toString(),
- new Split(
+ new DataSplit(
wantedPartition,
0,
generated.stream()
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index e8106dc3..08186888 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@@ -186,7 +186,7 @@ public class TableStoreRecordReaderTest {
private TableStoreRecordReader read(
FileStoreTable table, BinaryRowData partition, int bucket,
List<String> selectedColumns)
throws Exception {
- for (Split split : table.newScan().plan().splits) {
+ for (DataSplit split : table.newScan().plan().splits) {
if (split.partition().equals(partition) && split.bucket() ==
bucket) {
return new TableStoreRecordReader(
table.newRead(),
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
index 33e7fa51..4f992945 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
@@ -18,22 +18,16 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.store.table.source.Split;
import org.apache.spark.sql.connector.read.InputPartition;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
/** A Spark {@link InputPartition} for table store. */
public class SparkInputPartition implements InputPartition {
private static final long serialVersionUID = 1L;
- private transient Split split;
+ private final Split split;
public SparkInputPartition(Split split) {
this.split = split;
@@ -42,14 +36,4 @@ public class SparkInputPartition implements InputPartition {
public Split split() {
return split;
}
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- split.serialize(new DataOutputViewStreamWrapper(out));
- }
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- in.defaultReadObject();
- split = Split.deserialize(new DataInputViewStreamWrapper(in));
- }
}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index c94db2eb..54b119ea 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -21,7 +21,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
@@ -42,19 +42,18 @@ public class SparkReaderFactory implements
PartitionReaderFactory {
private static final long serialVersionUID = 1L;
- private final FileStoreTable table;
+ private final Table table;
private final int[] projectedFields;
private final List<Predicate> predicates;
- public SparkReaderFactory(
- FileStoreTable table, int[] projectedFields, List<Predicate>
predicates) {
+ public SparkReaderFactory(Table table, int[] projectedFields,
List<Predicate> predicates) {
this.table = table;
this.projectedFields = projectedFields;
this.predicates = predicates;
}
private RowType readRowType() {
- return TypeUtils.project(table.schema().logicalRowType(),
projectedFields);
+ return TypeUtils.project(table.rowType(), projectedFields);
}
@Override
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index 88b63127..576373d9 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -18,9 +18,8 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.utils.TypeUtils;
@@ -43,13 +42,13 @@ import java.util.OptionalLong;
*/
public class SparkScan implements Scan, SupportsReportStatistics {
- protected final FileStoreTable table;
+ protected final Table table;
private final List<Predicate> predicates;
private final int[] projectedFields;
private List<Split> splits;
- public SparkScan(FileStoreTable table, List<Predicate> predicates, int[]
projectedFields) {
+ public SparkScan(Table table, List<Predicate> predicates, int[]
projectedFields) {
this.table = table;
this.predicates = predicates;
this.projectedFields = projectedFields;
@@ -58,13 +57,12 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public String description() {
// TODO add filters
- return String.format("tablestore(%s)", table.location().getName());
+ return String.format("tablestore(%s)", table.name());
}
@Override
public StructType readSchema() {
- return SparkTypeUtils.fromFlinkRowType(
- TypeUtils.project(table.schema().logicalRowType(),
projectedFields));
+ return
SparkTypeUtils.fromFlinkRowType(TypeUtils.project(table.rowType(),
projectedFields));
}
@Override
@@ -86,7 +84,7 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
protected List<Split> splits() {
if (splits == null) {
- this.splits = table.newScan().withFilter(predicates).plan().splits;
+ this.splits =
table.newScan().withFilter(predicates).plan().splits();
}
return splits;
}
@@ -96,9 +94,7 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
long rowCount = 0L;
for (Split split : splits()) {
- for (DataFileMeta file : split.files()) {
- rowCount += file.rowCount();
- }
+ rowCount += split.rowCount();
}
final long numRows = rowCount;
@@ -128,13 +124,13 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
}
SparkScan that = (SparkScan) o;
- return table.location().equals(that.table.location())
+ return table.name().equals(that.table.name())
&& readSchema().equals(that.readSchema())
&& predicates.equals(that.predicates);
}
@Override
public int hashCode() {
- return Objects.hash(table.location(), readSchema(), predicates);
+ return Objects.hash(table.name(), readSchema(), predicates);
}
}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
index be639c1d..ae7faaea 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScanBuilder.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -35,19 +35,19 @@ import java.util.List;
public class SparkScanBuilder
implements ScanBuilder, SupportsPushDownFilters,
SupportsPushDownRequiredColumns {
- private final FileStoreTable table;
+ private final Table table;
private List<Predicate> predicates = new ArrayList<>();
private Filter[] pushedFilters;
private int[] projectedFields;
- public SparkScanBuilder(FileStoreTable table) {
+ public SparkScanBuilder(Table table) {
this.table = table;
}
@Override
public Filter[] pushFilters(Filter[] filters) {
- SparkFilterConverter converter = new
SparkFilterConverter(table.schema().logicalRowType());
+ SparkFilterConverter converter = new
SparkFilterConverter(table.rowType());
List<Predicate> predicates = new ArrayList<>();
List<Filter> pushed = new ArrayList<>();
for (Filter filter : filters) {
@@ -70,7 +70,7 @@ public class SparkScanBuilder
@Override
public void pruneColumns(StructType requiredSchema) {
String[] pruneFields = requiredSchema.fieldNames();
- List<String> fieldNames = table.schema().fieldNames();
+ List<String> fieldNames = table.rowType().getFieldNames();
int[] projected = new int[pruneFields.length];
for (int i = 0; i < projected.length; i++) {
projected[i] = fieldNames.indexOf(pruneFields[i]);
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 90468627..87887a2b 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.spark.sql.connector.catalog.Table;
@@ -62,7 +61,6 @@ public class SparkSource implements DataSourceRegister,
TableProvider {
@Override
public Table getTable(
StructType schema, Transform[] partitioning, Map<String, String>
options) {
- FileStoreTable table =
FileStoreTableFactory.create(Configuration.fromMap(options));
- return new SparkTable(table);
+ return new
SparkTable(FileStoreTableFactory.create(Configuration.fromMap(options)));
}
}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 8cf39668..aaebaeff 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -18,9 +18,9 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.SupportsPartition;
+import org.apache.flink.table.store.table.Table;
-import org.apache.spark.sql.catalog.Table;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.expressions.FieldReference;
@@ -33,12 +33,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.HashSet;
import java.util.Set;
-/** A spark {@link Table} for table store. */
+/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for table
store. */
public class SparkTable implements
org.apache.spark.sql.connector.catalog.Table, SupportsRead {
- private final FileStoreTable table;
+ private final Table table;
- public SparkTable(FileStoreTable table) {
+ public SparkTable(Table table) {
this.table = table;
}
@@ -50,12 +50,12 @@ public class SparkTable implements
org.apache.spark.sql.connector.catalog.Table,
@Override
public String name() {
- return table.location().getName();
+ return table.name();
}
@Override
public StructType schema() {
- return
SparkTypeUtils.fromFlinkRowType(table.schema().logicalRowType());
+ return SparkTypeUtils.fromFlinkRowType(table.rowType());
}
@Override
@@ -67,9 +67,13 @@ public class SparkTable implements
org.apache.spark.sql.connector.catalog.Table,
@Override
public Transform[] partitioning() {
- return table.schema().partitionKeys().stream()
- .map(FieldReference::apply)
- .map(IdentityTransform::apply)
- .toArray(Transform[]::new);
+ if (table instanceof SupportsPartition) {
+ return ((SupportsPartition) table)
+ .partitionKeys().stream()
+ .map(FieldReference::apply)
+ .map(IdentityTransform::apply)
+ .toArray(Transform[]::new);
+ }
+ return new Transform[0];
}
}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
index 2a056add..96555ee7 100644
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
+++
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkDataSourceReader.java
@@ -18,9 +18,8 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
@@ -46,20 +45,20 @@ public class SparkDataSourceReader
SupportsPushDownRequiredColumns,
SupportsReportStatistics {
- private final FileStoreTable table;
+ private final Table table;
private List<Predicate> predicates = new ArrayList<>();
private Filter[] pushedFilters;
private int[] projectedFields;
private List<Split> splits;
- public SparkDataSourceReader(FileStoreTable table) {
+ public SparkDataSourceReader(Table table) {
this.table = table;
}
@Override
public Filter[] pushFilters(Filter[] filters) {
- SparkFilterConverter converter = new
SparkFilterConverter(table.schema().logicalRowType());
+ SparkFilterConverter converter = new
SparkFilterConverter(table.rowType());
List<Predicate> predicates = new ArrayList<>();
List<Filter> pushed = new ArrayList<>();
for (Filter filter : filters) {
@@ -82,7 +81,7 @@ public class SparkDataSourceReader
@Override
public void pruneColumns(StructType requiredSchema) {
String[] pruneFields = requiredSchema.fieldNames();
- List<String> fieldNames = table.schema().fieldNames();
+ List<String> fieldNames = table.rowType().getFieldNames();
int[] projected = new int[pruneFields.length];
for (int i = 0; i < projected.length; i++) {
projected[i] = fieldNames.indexOf(pruneFields[i]);
@@ -95,9 +94,7 @@ public class SparkDataSourceReader
long rowCount = 0L;
for (Split split : splits()) {
- for (DataFileMeta file : split.files()) {
- rowCount += file.rowCount();
- }
+ rowCount += split.rowCount();
}
final long numRows = rowCount;
@@ -118,7 +115,7 @@ public class SparkDataSourceReader
@Override
public StructType readSchema() {
- RowType rowType = table.schema().logicalRowType();
+ RowType rowType = table.rowType();
return SparkTypeUtils.fromFlinkRowType(
projectedFields == null ? rowType : TypeUtils.project(rowType,
projectedFields));
}
@@ -132,7 +129,7 @@ public class SparkDataSourceReader
protected List<Split> splits() {
if (splits == null) {
- this.splits = table.newScan().withFilter(predicates).plan().splits;
+ this.splits =
table.newScan().withFilter(predicates).plan().splits();
}
return splits;
}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
index f4c5432a..368e6e82 100644
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
+++
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java
@@ -18,13 +18,11 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.utils.TypeUtils;
@@ -35,8 +33,6 @@ import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.util.List;
@@ -47,14 +43,13 @@ public class SparkInputPartition implements
InputPartition<InternalRow> {
private static final long serialVersionUID = 1L;
- private final FileStoreTable table;
+ private final Table table;
private final int[] projectedFields;
private final List<Predicate> predicates;
-
- private transient Split split;
+ private final Split split;
public SparkInputPartition(
- FileStoreTable table, int[] projectedFields, List<Predicate>
predicates, Split split) {
+ Table table, int[] projectedFields, List<Predicate> predicates,
Split split) {
this.table = table;
this.projectedFields = projectedFields;
this.predicates = predicates;
@@ -111,17 +106,7 @@ public class SparkInputPartition implements
InputPartition<InternalRow> {
}
private RowType readRowType() {
- RowType rowType = table.schema().logicalRowType();
+ RowType rowType = table.rowType();
return projectedFields == null ? rowType : TypeUtils.project(rowType,
projectedFields);
}
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- split.serialize(new DataOutputViewStreamWrapper(out));
- }
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- in.defaultReadObject();
- split = Split.deserialize(new DataInputViewStreamWrapper(in));
- }
}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index bd2fde5b..0f8dd8f4 100644
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.spark.sql.sources.DataSourceRegister;
@@ -44,7 +43,7 @@ public class SparkSource implements DataSourceRegister,
ReadSupport {
@Override
public DataSourceReader createReader(DataSourceOptions options) {
- FileStoreTable table =
FileStoreTableFactory.create(Configuration.fromMap(options.asMap()));
- return new SparkDataSourceReader(table);
+ return new SparkDataSourceReader(
+
FileStoreTableFactory.create(Configuration.fromMap(options.asMap())));
}
}