This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 08f8d9bf [FLINK-28264] Refactor split and read in table store
08f8d9bf is described below
commit 08f8d9bf664fbfa7a1f0239ccce6163d88361c0a
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 28 13:32:38 2022 +0800
[FLINK-28264] Refactor split and read in table store
This closes #177
---
.../source/ContinuousFileSplitEnumerator.java | 2 +-
.../store/connector/source/FileStoreSource.java | 2 +-
.../connector/source/FileStoreSourceSplit.java | 52 +++-------
.../source/FileStoreSourceSplitGenerator.java | 5 +-
.../source/FileStoreSourceSplitReader.java | 4 +-
.../source/FileStoreSourceSplitSerializer.java | 22 +---
.../table/store/connector/sink/StoreSinkTest.java | 5 +-
.../store/connector/sink/TestFileStoreTable.java | 9 +-
.../source/FileStoreSourceReaderTest.java | 3 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 20 ++--
.../source/FileStoreSourceSplitReaderTest.java | 18 ++--
.../source/FileStoreSourceSplitSerializerTest.java | 45 ++++++++-
.../source/FileStoreSourceSplitStateTest.java | 3 +-
.../PendingSplitsCheckpointSerializerTest.java | 7 +-
.../source/StaticFileStoreSplitEnumeratorTest.java | 3 +-
.../source/TestChangelogDataReadWrite.java | 8 +-
.../apache/flink/table/store/file/KeyValue.java | 5 +
.../table/store/file/data/DataFileReader.java | 11 +-
.../file/operation/AbstractFileStoreScan.java | 5 +-
.../file/operation/AppendOnlyFileStoreRead.java | 9 +-
.../table/store/file/operation/FileStoreRead.java | 9 +-
.../file/operation/KeyValueFileStoreRead.java | 68 +++++--------
.../store/file/utils/ProjectKeyRecordReader.java | 80 +++++++++++++++
.../table/store/table/AbstractFileStoreTable.java | 11 +-
.../store/table/AppendOnlyFileStoreTable.java | 20 ++--
.../table/ChangelogValueCountFileStoreTable.java | 22 +---
.../table/ChangelogWithKeyFileStoreTable.java | 11 +-
.../flink/table/store/table/FileStoreTable.java | 3 +-
.../table/store/table/FileStoreTableFactory.java | 8 +-
.../store/table/source/KeyValueTableRead.java | 8 +-
.../flink/table/store/table/source/Split.java | 35 ++-----
.../flink/table/store/table/source/TableRead.java | 8 +-
.../flink/table/store/table/source/TableScan.java | 20 ++--
.../source/ValueCountRowDataRecordIterator.java | 13 +--
.../flink/table/store/file/TestFileStore.java | 9 +-
.../file/operation/KeyValueFileStoreReadTest.java | 14 +--
.../store/table/AppendOnlyFileStoreTableTest.java | 9 +-
.../ChangelogValueCountFileStoreTableTest.java | 9 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 9 +-
.../table/store/table/FileStoreTableTestBase.java | 17 ++--
.../table/store/table/WritePreemptMemoryTest.java | 3 +-
.../flink/table/store/table/source/SplitTest.java | 3 +-
.../ValueCountRowDataRecordIteratorTest.java | 9 +-
.../table/store/mapred/TableStoreInputFormat.java | 6 +-
.../table/store/mapred/TableStoreInputSplit.java | 111 +++++++--------------
.../store/mapred/TableStoreInputSplitTest.java | 17 ++--
.../store/mapred/TableStoreRecordReaderTest.java | 3 +-
.../table/store/spark/SparkReaderFactory.java | 4 +-
.../apache/flink/table/store/spark/SparkScan.java | 2 +-
.../apache/flink/table/store/spark/SparkTable.java | 2 +-
50 files changed, 380 insertions(+), 401 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 40d7fba4..8a8762c7 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -95,7 +95,7 @@ public class ContinuousFileSplitEnumerator
}
private void addSplit(FileStoreSourceSplit split) {
- bucketSplits.computeIfAbsent(split.bucket(), i -> new
LinkedList<>()).add(split);
+ bucketSplits.computeIfAbsent(split.split().bucket(), i -> new
LinkedList<>()).add(split);
}
@Override
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 956d46d5..2752b64a 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -80,7 +80,7 @@ public class FileStoreSource
@Override
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
- TableRead read = table.newRead().withIncremental(isContinuous);
+ TableRead read = table.newRead();
if (projectedFields != null) {
read.withProjection(projectedFields);
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
index 114b967d..4ff027ea 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplit.java
@@ -19,10 +19,8 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.table.source.Split;
-import java.util.List;
import java.util.Objects;
/** {@link SourceSplit} of file store. */
@@ -31,42 +29,22 @@ public class FileStoreSourceSplit implements SourceSplit {
/** The unique ID of the split. Unique within the scope of this source. */
private final String id;
- private final BinaryRowData partition;
-
- private final int bucket;
-
- private final List<DataFileMeta> files;
+ private final Split split;
private final long recordsToSkip;
- public FileStoreSourceSplit(
- String id, BinaryRowData partition, int bucket, List<DataFileMeta>
files) {
- this(id, partition, bucket, files, 0);
+ public FileStoreSourceSplit(String id, Split split) {
+ this(id, split, 0);
}
- public FileStoreSourceSplit(
- String id,
- BinaryRowData partition,
- int bucket,
- List<DataFileMeta> files,
- long recordsToSkip) {
+ public FileStoreSourceSplit(String id, Split split, long recordsToSkip) {
this.id = id;
- this.partition = partition;
- this.bucket = bucket;
- this.files = files;
+ this.split = split;
this.recordsToSkip = recordsToSkip;
}
- public BinaryRowData partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public List<DataFileMeta> files() {
- return files;
+ public Split split() {
+ return split;
}
public long recordsToSkip() {
@@ -79,7 +57,7 @@ public class FileStoreSourceSplit implements SourceSplit {
}
public FileStoreSourceSplit updateWithRecordsToSkip(long recordsToSkip) {
- return new FileStoreSourceSplit(id, partition, bucket, files,
recordsToSkip);
+ return new FileStoreSourceSplit(id, split, recordsToSkip);
}
@Override
@@ -90,16 +68,14 @@ public class FileStoreSourceSplit implements SourceSplit {
if (o == null || getClass() != o.getClass()) {
return false;
}
- FileStoreSourceSplit split = (FileStoreSourceSplit) o;
- return bucket == split.bucket
- && recordsToSkip == split.recordsToSkip
- && Objects.equals(id, split.id)
- && Objects.equals(partition, split.partition)
- && Objects.equals(files, split.files);
+ FileStoreSourceSplit other = (FileStoreSourceSplit) o;
+ return Objects.equals(id, other.id)
+ && Objects.equals(this.split, other.split)
+ && recordsToSkip == other.recordsToSkip;
}
@Override
public int hashCode() {
- return Objects.hash(id, partition, bucket, files, recordsToSkip);
+ return Objects.hash(id, split, recordsToSkip);
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index ef5efbe4..dc6b9cd3 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -37,10 +37,7 @@ public class FileStoreSourceSplitGenerator {
public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
return plan.splits.stream()
- .map(
- s ->
- new FileStoreSourceSplit(
- getNextId(), s.partition(),
s.bucket(), s.files()))
+ .map(s -> new FileStoreSourceSplit(getNextId(), s))
.collect(Collectors.toList());
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 126ce871..3795259b 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -123,9 +123,7 @@ public class FileStoreSourceSplitReader
}
currentSplitId = nextSplit.splitId();
- currentReader =
- tableRead.createReader(
- nextSplit.partition(), nextSplit.bucket(),
nextSplit.files());
+ currentReader = tableRead.createReader(nextSplit.split());
currentNumRead = nextSplit.recordsToSkip();
if (currentNumRead > 0) {
seek(currentNumRead);
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
index 5f606b96..796373b6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializer.java
@@ -21,24 +21,15 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
+import org.apache.flink.table.store.table.source.Split;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import static
org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
-import static
org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
-
/** A {@link SimpleVersionedSerializer} for {@link FileStoreSourceSplit}. */
public class FileStoreSourceSplitSerializer
implements SimpleVersionedSerializer<FileStoreSourceSplit> {
- private final DataFileMetaSerializer dataFileSerializer;
-
- public FileStoreSourceSplitSerializer() {
- this.dataFileSerializer = new DataFileMetaSerializer();
- }
-
@Override
public int getVersion() {
return 1;
@@ -49,9 +40,7 @@ public class FileStoreSourceSplitSerializer
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
view.writeUTF(split.splitId());
- serializeBinaryRow(split.partition(), view);
- view.writeInt(split.bucket());
- dataFileSerializer.serializeList(split.files(), view);
+ split.split().serialize(view);
view.writeLong(split.recordsToSkip());
return out.toByteArray();
}
@@ -59,11 +48,6 @@ public class FileStoreSourceSplitSerializer
@Override
public FileStoreSourceSplit deserialize(int version, byte[] serialized)
throws IOException {
DataInputDeserializer view = new DataInputDeserializer(serialized);
- return new FileStoreSourceSplit(
- view.readUTF(),
- deserializeBinaryRow(view),
- view.readInt(),
- dataFileSerializer.deserializeList(view),
- view.readLong());
+ return new FileStoreSourceSplit(view.readUTF(),
Split.deserialize(view), view.readLong());
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index d572de26..561dbce0 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -89,8 +89,9 @@ public class StoreSinkTest {
@Before
public void before() throws Exception {
+ Path path = new Path(tempFolder.newFolder().toURI().toString());
TableSchema tableSchema =
- new SchemaManager(new
Path(tempFolder.newFolder().toURI().toString()))
+ new SchemaManager(path)
.commitNewVersion(
new UpdateSchema(
RowType.of(
@@ -107,7 +108,7 @@ public class StoreSinkTest {
RowType partitionType = tableSchema.logicalPartitionType();
fileStore = new TestFileStore(hasPk, partitionType);
- table = new TestFileStoreTable(fileStore, tableSchema);
+ table = new TestFileStoreTable(path, fileStore, tableSchema);
}
@Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index 2bd7e41a..52b7febd 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -37,17 +38,19 @@ import org.apache.flink.types.RowKind;
/** {@link FileStoreTable} for tests. */
public class TestFileStoreTable implements FileStoreTable {
+ private final Path path;
private final TestFileStore store;
private final TableSchema tableSchema;
- public TestFileStoreTable(TestFileStore store, TableSchema tableSchema) {
+ public TestFileStoreTable(Path path, TestFileStore store, TableSchema
tableSchema) {
+ this.path = path;
this.store = store;
this.tableSchema = tableSchema;
}
@Override
- public String name() {
- return "test";
+ public Path location() {
+ return path;
}
@Override
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 979444f8..626bdc38 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.Collections;
+import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,6 +64,6 @@ public class FileStoreSourceReaderTest {
}
private static FileStoreSourceSplit createTestFileSplit() {
- return new FileStoreSourceSplit("id1", row(1), 0,
Collections.emptyList());
+ return newSourceSplit("id1", row(1), 0, Collections.emptyList());
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 1b20a9df..d25e9119 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -18,13 +18,11 @@
package org.apache.flink.table.store.connector.source;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableScan;
@@ -79,17 +77,16 @@ public class FileStoreSourceSplitGeneratorTest {
};
List<Split> scanSplits =
TableScan.generateSplits(
- new FileStorePathFactory(new
Path(tempDir.toUri().toString())),
- Collections::singletonList,
- plan.groupByPartFiles());
+ false, Collections::singletonList,
plan.groupByPartFiles());
TableScan.Plan tableScanPlan = new TableScan.Plan(1L, scanSplits);
List<FileStoreSourceSplit> splits =
new
FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
assertThat(splits.size()).isEqualTo(12);
splits.sort(
- Comparator.comparingInt(o -> ((FileStoreSourceSplit)
o).partition().getInt(0))
- .thenComparing(o -> ((FileStoreSourceSplit)
o).bucket()));
+ Comparator.comparingInt(
+ o -> ((FileStoreSourceSplit)
o).split().partition().getInt(0))
+ .thenComparing(o -> ((FileStoreSourceSplit)
o).split().bucket()));
assertSplit(splits.get(0), "0000000007", 1, 0, Arrays.asList("f0",
"f1"));
assertSplit(splits.get(1), "0000000008", 1, 1,
Collections.singletonList("f2"));
assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3",
"f4", "f5"));
@@ -107,9 +104,12 @@ public class FileStoreSourceSplitGeneratorTest {
private void assertSplit(
FileStoreSourceSplit split, String splitId, int part, int bucket,
List<String> files) {
assertThat(split.splitId()).isEqualTo(splitId);
- assertThat(split.partition().getInt(0)).isEqualTo(part);
- assertThat(split.bucket()).isEqualTo(bucket);
-
assertThat(split.files().stream().map(DataFileMeta::fileName).collect(Collectors.toList()))
+ assertThat(split.split().partition().getInt(0)).isEqualTo(part);
+ assertThat(split.split().bucket()).isEqualTo(bucket);
+ assertThat(
+ split.split().files().stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()))
.isEqualTo(files);
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index b5d39f2e..258f8da6 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -95,7 +96,7 @@ public class FileStoreSourceSplitReaderTest {
List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
- assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files,
skip));
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files, skip));
RecordsWithSplitIds<RecordAndPosition<RowData>> records =
reader.fetch();
@@ -131,8 +132,7 @@ public class FileStoreSourceSplitReaderTest {
@Test
public void testPrimaryKeyWithDelete() throws Exception {
TestChangelogDataReadWrite rw = new
TestChangelogDataReadWrite(tempDir.toString(), service);
- FileStoreSourceSplitReader reader =
- new
FileStoreSourceSplitReader(rw.createReadWithKey().withIncremental(true));
+ FileStoreSourceSplitReader reader = new
FileStoreSourceSplitReader(rw.createReadWithKey());
List<Tuple2<Long, Long>> input = kvs();
RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
@@ -150,7 +150,7 @@ public class FileStoreSourceSplitReaderTest {
List<DataFileMeta> files = writer.prepareCommit().newFiles();
writer.close();
- assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true));
RecordsWithSplitIds<RecordAndPosition<RowData>> records =
reader.fetch();
List<Tuple2<RowKind, Long>> expected =
@@ -180,7 +180,7 @@ public class FileStoreSourceSplitReaderTest {
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);
- assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files));
RecordsWithSplitIds<RecordAndPosition<RowData>> records =
reader.fetch();
assertRecords(
@@ -212,7 +212,7 @@ public class FileStoreSourceSplitReaderTest {
List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
- assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files,
3));
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 3));
RecordsWithSplitIds<RecordAndPosition<RowData>> records =
reader.fetch();
assertRecords(
@@ -242,7 +242,7 @@ public class FileStoreSourceSplitReaderTest {
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);
- assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files,
7));
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 7));
RecordsWithSplitIds<RecordAndPosition<RowData>> records =
reader.fetch();
assertRecords(
@@ -268,11 +268,11 @@ public class FileStoreSourceSplitReaderTest {
List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files1 = rw.writeFiles(row(1), 0, input1);
- assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0,
files1));
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files1));
List<Tuple2<Long, Long>> input2 = kvs();
List<DataFileMeta> files2 = rw.writeFiles(row(2), 1, input2);
- assignSplit(reader, new FileStoreSourceSplit("id2", row(2), 1,
files2));
+ assignSplit(reader, newSourceSplit("id2", row(2), 1, files2));
RecordsWithSplitIds<RecordAndPosition<RowData>> records =
reader.fetch();
assertRecords(
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index d01be01c..90c595e4 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -19,13 +19,16 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.table.source.Split;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,7 +39,7 @@ public class FileStoreSourceSplitSerializerTest {
@Test
public void serializeSplit() throws Exception {
final FileStoreSourceSplit split =
- new FileStoreSourceSplit("id", row(1), 2,
Arrays.asList(newFile(0), newFile(1)));
+ newSourceSplit("id", row(1), 2, Arrays.asList(newFile(0),
newFile(1)));
final FileStoreSourceSplit deSerialized =
serializeAndDeserialize(split);
@@ -46,8 +49,7 @@ public class FileStoreSourceSplitSerializerTest {
@Test
public void serializeSplitWithReaderPosition() throws Exception {
final FileStoreSourceSplit split =
- new FileStoreSourceSplit(
- "id", row(1), 2, Arrays.asList(newFile(0),
newFile(1)), 29);
+ newSourceSplit("id", row(1), 2, Arrays.asList(newFile(0),
newFile(1)), 29);
final FileStoreSourceSplit deSerialized =
serializeAndDeserialize(split);
@@ -57,8 +59,7 @@ public class FileStoreSourceSplitSerializerTest {
@Test
public void repeatedSerialization() throws Exception {
final FileStoreSourceSplit split =
- new FileStoreSourceSplit(
- "id", row(1), 2, Arrays.asList(newFile(0),
newFile(1)), 29);
+ newSourceSplit("id", row(1), 2, Arrays.asList(newFile(0),
newFile(1)), 29);
serializeAndDeserialize(split);
serializeAndDeserialize(split);
@@ -86,6 +87,40 @@ public class FileStoreSourceSplitSerializerTest {
level);
}
+ public static FileStoreSourceSplit newSourceSplit(
+ String id, BinaryRowData partition, int bucket, List<DataFileMeta>
files) {
+ return newSourceSplit(id, partition, bucket, files, false, 0);
+ }
+
+ public static FileStoreSourceSplit newSourceSplit(
+ String id,
+ BinaryRowData partition,
+ int bucket,
+ List<DataFileMeta> files,
+ boolean isIncremental) {
+ return newSourceSplit(id, partition, bucket, files, isIncremental, 0);
+ }
+
+ public static FileStoreSourceSplit newSourceSplit(
+ String id,
+ BinaryRowData partition,
+ int bucket,
+ List<DataFileMeta> files,
+ long recordsToSkip) {
+ return newSourceSplit(id, partition, bucket, files, false,
recordsToSkip);
+ }
+
+ public static FileStoreSourceSplit newSourceSplit(
+ String id,
+ BinaryRowData partition,
+ int bucket,
+ List<DataFileMeta> files,
+ boolean isIncremental,
+ long recordsToSkip) {
+ return new FileStoreSourceSplit(
+ id, new Split(partition, bucket, files, isIncremental),
recordsToSkip);
+ }
+
private static FileStoreSourceSplit
serializeAndDeserialize(FileStoreSourceSplit split)
throws IOException {
final FileStoreSourceSplitSerializer serializer = new
FileStoreSourceSplitSerializer();
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
index fa268d4a..e6c72d63 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,7 +67,7 @@ public class FileStoreSourceSplitStateTest {
}
private static FileStoreSourceSplit getTestSplit(long recordsToSkip) {
- return new FileStoreSourceSplit(
+ return newSourceSplit(
"id", row(1), 2, Arrays.asList(newFile(0), newFile(1)),
recordsToSkip);
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index 124d5a32..a3ab6a2f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.Collections;
import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -82,15 +83,15 @@ public class PendingSplitsCheckpointSerializerTest {
// ------------------------------------------------------------------------
private static FileStoreSourceSplit testSplit1() {
- return new FileStoreSourceSplit("id1", row(1), 2,
Arrays.asList(newFile(0), newFile(1)));
+ return newSourceSplit("id1", row(1), 2, Arrays.asList(newFile(0),
newFile(1)));
}
private static FileStoreSourceSplit testSplit2() {
- return new FileStoreSourceSplit("id2", row(2), 3,
Arrays.asList(newFile(2), newFile(3)));
+ return newSourceSplit("id2", row(2), 3, Arrays.asList(newFile(2),
newFile(3)));
}
private static FileStoreSourceSplit testSplit3() {
- return new FileStoreSourceSplit("id3", row(3), 4,
Arrays.asList(newFile(5), newFile(6)));
+ return newSourceSplit("id3", row(3), 4, Arrays.asList(newFile(5),
newFile(6)));
}
private static PendingSplitsCheckpoint serializeAndDeserialize(
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
index 7fe861d4..6a60b140 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -96,7 +97,7 @@ public class StaticFileStoreSplitEnumeratorTest {
// ------------------------------------------------------------------------
private static FileStoreSourceSplit createRandomSplit() {
- return new FileStoreSourceSplit("split", row(1), 2,
Arrays.asList(newFile(0), newFile(1)));
+ return newSourceSplit("split", row(1), 2, Arrays.asList(newFile(0),
newFile(1)));
}
private static StaticFileStoreSplitEnumerator createEnumerator(
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 66c66dd0..feba9da2 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -95,7 +95,7 @@ public class TestChangelogDataReadWrite {
}
public TableRead createReadWithValueCount() {
- return createRead(it -> new ValueCountRowDataRecordIterator(it, null));
+ return createRead(ValueCountRowDataRecordIterator::new);
}
private TableRead createRead(
@@ -117,12 +117,6 @@ public class TestChangelogDataReadWrite {
throw new UnsupportedOperationException();
}
- @Override
- public TableRead withIncremental(boolean isIncremental) {
- read.withDropDelete(!isIncremental);
- return this;
- }
-
@Override
protected RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 1978220c..80083b6f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -59,6 +59,11 @@ public class KeyValue {
return this;
}
+ public KeyValue replaceKey(RowData key) {
+ this.key = key;
+ return this;
+ }
+
public RowData key() {
return key;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index aecf25ae..098cf430 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -134,6 +134,7 @@ public class DataFileReader {
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
+ private final int[][] fullKeyProjection;
private int[][] keyProjection;
private int[][] valueProjection;
private RowType projectedKeyType;
@@ -153,7 +154,8 @@ public class DataFileReader {
this.fileFormat = fileFormat;
this.pathFactory = pathFactory;
- this.keyProjection = Projection.range(0,
keyType.getFieldCount()).toNestedIndexes();
+ this.fullKeyProjection = Projection.range(0,
keyType.getFieldCount()).toNestedIndexes();
+ this.keyProjection = fullKeyProjection;
this.valueProjection = Projection.range(0,
valueType.getFieldCount()).toNestedIndexes();
applyProjection();
}
@@ -171,6 +173,13 @@ public class DataFileReader {
}
public DataFileReader create(BinaryRowData partition, int bucket) {
+ return create(partition, bucket, true);
+ }
+
+ public DataFileReader create(BinaryRowData partition, int bucket,
boolean projectKeys) {
+ int[][] keyProjection = projectKeys ? this.keyProjection :
fullKeyProjection;
+ RowType projectedKeyType = projectKeys ? this.projectedKeyType :
keyType;
+
RowType recordType = KeyValue.schema(keyType, valueType);
int[][] projection =
KeyValue.project(keyProjection, valueProjection,
keyType.getFieldCount());
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 4745d41e..3fbb2653 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -40,9 +40,8 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -181,7 +180,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
throw new RuntimeException("Failed to read ManifestEntry list
concurrently", e);
}
- Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+ LinkedHashMap<ManifestEntry.Identifier, ManifestEntry> map = new
LinkedHashMap<>();
for (ManifestEntry entry : entries) {
ManifestEntry.Identifier identifier = entry.identifier();
switch (entry.kind()) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index d8a77d63..346cac87 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -22,7 +22,6 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.AppendOnlyReader;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
@@ -31,6 +30,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
@@ -69,14 +69,13 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<RowData> {
}
@Override
- public RecordReader<RowData> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files)
throws IOException {
+ public RecordReader<RowData> createReader(Split split) throws IOException {
BulkFormat<RowData, FileSourceSplit> readerFactory =
fileFormat.createReaderFactory(rowType, projection);
DataFilePathFactory dataFilePathFactory =
- pathFactory.createDataFilePathFactory(partition, bucket);
+ pathFactory.createDataFilePathFactory(split.partition(),
split.bucket());
List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new
ArrayList<>();
- for (DataFileMeta file : files) {
+ for (DataFileMeta file : split.files()) {
suppliers.add(
() ->
new AppendOnlyReader(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index fbe02fea..43729878 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -18,12 +18,10 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.Split;
import java.io.IOException;
-import java.util.List;
/**
* Read operation which provides {@link RecordReader} creation.
@@ -32,7 +30,6 @@ import java.util.List;
*/
public interface FileStoreRead<T> {
- /** Create a {@link RecordReader} from partition and bucket and files. */
- RecordReader<T> createReader(BinaryRowData partition, int bucket,
List<DataFileMeta> files)
- throws IOException;
+ /** Create a {@link RecordReader} from split. */
+ RecordReader<T> createReader(Split split) throws IOException;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 12cc5ed9..f9eb3b2a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
@@ -30,7 +29,9 @@ import
org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
@@ -48,8 +49,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private final Comparator<RowData> keyComparator;
private final MergeFunction mergeFunction;
- private boolean keyProjected;
- private boolean dropDelete = true;
+ private int[][] keyProjectedFields;
public KeyValueFileStoreRead(
SchemaManager schemaManager,
@@ -65,18 +65,11 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
schemaManager, schemaId, keyType, valueType,
fileFormat, pathFactory);
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
-
- this.keyProjected = false;
- }
-
- public KeyValueFileStoreRead withDropDelete(boolean dropDelete) {
- this.dropDelete = dropDelete;
- return this;
}
public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
dataFileReaderFactory.withKeyProjection(projectedFields);
- keyProjected = true;
+ this.keyProjectedFields = projectedFields;
return this;
}
@@ -85,43 +78,34 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
return this;
}
- /**
- * The resulting reader has the following characteristics:
- *
- * <ul>
- * <li>If {@link KeyValueFileStoreRead#withKeyProjection} is called,
key-values produced by
- * this reader may be unordered and may contain duplicated keys.
- * <li>If {@link KeyValueFileStoreRead#withKeyProjection} is not called,
key-values produced
- * by this reader is guaranteed to be ordered by keys and does not
contain duplicated
- * keys.
- * </ul>
- */
@Override
- public RecordReader<KeyValue> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files)
throws IOException {
- DataFileReader dataFileReader =
dataFileReaderFactory.create(partition, bucket);
- if (keyProjected) {
- // key projection has been applied, so data file readers will not
return key-values in
- // order, we have to return the raw file contents without merging
+ public RecordReader<KeyValue> createReader(Split split) throws IOException
{
+ if (split.isIncremental()) {
+ DataFileReader dataFileReader =
+ dataFileReaderFactory.create(split.partition(),
split.bucket(), true);
+ // Return the raw file contents without merging
List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new
ArrayList<>();
- for (DataFileMeta file : files) {
+ for (DataFileMeta file : split.files()) {
suppliers.add(() -> dataFileReader.read(file.fileName()));
}
-
- if (dropDelete) {
- throw new UnsupportedOperationException(
- "The key is projected, there is no ability to merge
records, so the deleted message cannot be dropped.");
- }
return ConcatRecordReader.create(suppliers);
} else {
- // key projection is not applied, so data file readers will return
key-values in order,
- // in this case merge tree can merge records with same key for us
- return new MergeTreeReader(
- new IntervalPartition(files, keyComparator).partition(),
- dropDelete,
- dataFileReader,
- keyComparator,
- mergeFunction.copy());
+ // in this case merge tree should merge records with same key
+ // Do not project key in MergeTreeReader.
+ DataFileReader dataFileReader =
+ dataFileReaderFactory.create(split.partition(),
split.bucket(), false);
+ MergeTreeReader reader =
+ new MergeTreeReader(
+ new IntervalPartition(split.files(),
keyComparator).partition(),
+ true,
+ dataFileReader,
+ keyComparator,
+ mergeFunction.copy());
+
+ // project key using ProjectKeyRecordReader
+ return keyProjectedFields == null
+ ? reader
+ : new ProjectKeyRecordReader(reader, keyProjectedFields);
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ProjectKeyRecordReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ProjectKeyRecordReader.java
new file mode 100644
index 00000000..95abb6ba
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/ProjectKeyRecordReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** A {@link RecordReader} with key projection. */
+public class ProjectKeyRecordReader implements RecordReader<KeyValue> {
+
+ private final RecordReader<KeyValue> reader;
+ private final ProjectedRowData projectedRow;
+
+ public ProjectKeyRecordReader(RecordReader<KeyValue> reader, int[][]
keyProjectedFields) {
+ this.reader = reader;
+ this.projectedRow = ProjectedRowData.from(keyProjectedFields);
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<KeyValue> readBatch() throws IOException {
+ RecordIterator<KeyValue> batch = reader.readBatch();
+ if (batch == null) {
+ return null;
+ }
+
+ return new ProjectedIterator(batch, projectedRow);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /** A {@link RecordIterator} with key projection. */
+ public static class ProjectedIterator implements RecordIterator<KeyValue> {
+
+ private final RecordIterator<KeyValue> iterator;
+ private final ProjectedRowData projectedRow;
+
+ public ProjectedIterator(RecordIterator<KeyValue> iterator,
ProjectedRowData projectedRow) {
+ this.iterator = iterator;
+ this.projectedRow = projectedRow;
+ }
+
+ @Override
+ public KeyValue next() throws IOException {
+ KeyValue kv = iterator.next();
+ if (kv == null) {
+ return null;
+ }
+ return kv.replaceKey(projectedRow.replaceRow(kv.key()));
+ }
+
+ @Override
+ public void releaseBatch() {
+ iterator.releaseBatch();
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index d04f104a..0897416e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -29,19 +30,19 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
private static final long serialVersionUID = 1L;
- private final String name;
+ private final Path path;
protected final TableSchema tableSchema;
- public AbstractFileStoreTable(String name, TableSchema tableSchema) {
- this.name = name;
+ public AbstractFileStoreTable(Path path, TableSchema tableSchema) {
+ this.path = path;
this.tableSchema = tableSchema;
}
protected abstract FileStore<?> store();
@Override
- public String name() {
- return name;
+ public Path location() {
+ return path;
}
@Override
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index b22fd6e7..4fdede94 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -18,12 +18,11 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.AppendOnlyFileStore;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
@@ -37,6 +36,7 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
@@ -44,7 +44,6 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.List;
/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
@@ -54,8 +53,8 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
private final AppendOnlyFileStore store;
AppendOnlyFileStoreTable(
- String name, SchemaManager schemaManager, TableSchema tableSchema,
String user) {
- super(name, tableSchema);
+ Path path, SchemaManager schemaManager, TableSchema tableSchema,
String user) {
+ super(path, tableSchema);
this.store =
new AppendOnlyFileStore(
schemaManager,
@@ -94,15 +93,8 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableRead withIncremental(boolean isIncremental) {
- return this;
- }
-
- @Override
- public RecordReader<RowData> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta>
files)
- throws IOException {
- return read.createReader(partition, bucket, files);
+ public RecordReader<RowData> createReader(Split split) throws
IOException {
+ return read.createReader(split);
}
};
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 5319c7b5..d622f6ec 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileStoreOptions;
@@ -56,8 +57,8 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
private final KeyValueFileStore store;
ChangelogValueCountFileStoreTable(
- String name, SchemaManager schemaManager, TableSchema tableSchema,
String user) {
- super(name, tableSchema);
+ Path path, SchemaManager schemaManager, TableSchema tableSchema,
String user) {
+ super(path, tableSchema);
RowType countType =
RowType.of(
new LogicalType[] {new BigIntType(false)}, new
String[] {"_VALUE_COUNT"});
@@ -96,30 +97,17 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableRead newRead() {
return new KeyValueTableRead(store.newRead()) {
- private int[][] projection = null;
- private boolean isIncremental = false;
@Override
public TableRead withProjection(int[][] projection) {
- if (isIncremental) {
- read.withKeyProjection(projection);
- } else {
- this.projection = projection;
- }
- return this;
- }
-
- @Override
- public TableRead withIncremental(boolean isIncremental) {
- this.isIncremental = isIncremental;
- read.withDropDelete(!isIncremental);
+ read.withKeyProjection(projection);
return this;
}
@Override
protected RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
- return new ValueCountRowDataRecordIterator(kvRecordIterator,
projection);
+ return new ValueCountRowDataRecordIterator(kvRecordIterator);
}
};
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index bb26efa0..1c32917f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
@@ -61,8 +62,8 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
private final KeyValueFileStore store;
ChangelogWithKeyFileStoreTable(
- String name, SchemaManager schemaManager, TableSchema tableSchema,
String user) {
- super(name, tableSchema);
+ Path path, SchemaManager schemaManager, TableSchema tableSchema,
String user) {
+ super(path, tableSchema);
RowType rowType = tableSchema.logicalRowType();
// add _KEY_ prefix to avoid conflict with value
@@ -158,12 +159,6 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
return this;
}
- @Override
- public TableRead withIncremental(boolean isIncremental) {
- read.withDropDelete(!isIncremental);
- return this;
- }
-
@Override
protected RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 7b7d13cd..82074f32 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -34,7 +35,7 @@ import java.io.Serializable;
*/
public interface FileStoreTable extends Serializable {
- String name();
+ Path location();
TableSchema schema();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 418706c4..104ae0bb 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -38,7 +38,6 @@ public class FileStoreTableFactory {
public static FileStoreTable create(Configuration conf, String user) {
Path tablePath = FileStoreOptions.path(conf);
- String name = tablePath.getName();
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager
@@ -56,13 +55,14 @@ public class FileStoreTableFactory {
tableSchema = tableSchema.copy(newOptions);
if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
- return new AppendOnlyFileStoreTable(name, schemaManager,
tableSchema, user);
+ return new AppendOnlyFileStoreTable(tablePath, schemaManager,
tableSchema, user);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
return new ChangelogValueCountFileStoreTable(
- name, schemaManager, tableSchema, user);
+ tablePath, schemaManager, tableSchema, user);
} else {
- return new ChangelogWithKeyFileStoreTable(name, schemaManager,
tableSchema, user);
+ return new ChangelogWithKeyFileStoreTable(
+ tablePath, schemaManager, tableSchema, user);
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
index f5336065..c3939a4d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
@@ -19,16 +19,13 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.utils.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.List;
/**
* An abstraction layer above {@link KeyValueFileStoreRead} to provide reading
of {@link RowData}.
@@ -42,9 +39,8 @@ public abstract class KeyValueTableRead implements TableRead {
}
@Override
- public RecordReader<RowData> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files)
throws IOException {
- return new RowDataRecordReader(read.createReader(partition, bucket,
files));
+ public RecordReader<RowData> createReader(Split split) throws IOException {
+ return new RowDataRecordReader(read.createReader(split));
}
protected abstract RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
index 29f5be37..d8dad370 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/Split.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.table.source;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -31,26 +30,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import static
org.apache.flink.table.store.file.utils.SerializationUtils.deserializedBytes;
-import static
org.apache.flink.table.store.file.utils.SerializationUtils.serializeBytes;
-import static org.apache.flink.util.InstantiationUtil.deserializeObject;
-import static org.apache.flink.util.InstantiationUtil.serializeObject;
-
/** Input splits. Needed by most batch computation engines. */
public class Split {
private final BinaryRowData partition;
private final int bucket;
private final List<DataFileMeta> files;
+ private final boolean isIncremental;
- private final Path bucketPath;
-
- public Split(BinaryRowData partition, int bucket, List<DataFileMeta>
files, Path bucketPath) {
+ public Split(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files,
boolean isIncremental) {
this.partition = partition;
this.bucket = bucket;
this.files = files;
-
- this.bucketPath = bucketPath;
+ this.isIncremental = isIncremental;
}
public BinaryRowData partition() {
@@ -65,8 +58,8 @@ public class Split {
return files;
}
- public Path bucketPath() {
- return bucketPath;
+ public boolean isIncremental() {
+ return isIncremental;
}
@Override
@@ -81,12 +74,12 @@ public class Split {
return bucket == split.bucket
&& Objects.equals(partition, split.partition)
&& Objects.equals(files, split.files)
- && Objects.equals(bucketPath, split.bucketPath);
+ && isIncremental == split.isIncremental;
}
@Override
public int hashCode() {
- return Objects.hash(partition, bucket, files, bucketPath);
+ return Objects.hash(partition, bucket, files, isIncremental);
}
public void serialize(DataOutputView out) throws IOException {
@@ -97,7 +90,7 @@ public class Split {
for (DataFileMeta file : files) {
dataFileSer.serialize(file, out);
}
- serializeBytes(out, serializeObject(bucketPath));
+ out.writeBoolean(isIncremental);
}
public static Split deserialize(DataInputView in) throws IOException {
@@ -109,14 +102,6 @@ public class Split {
for (int i = 0; i < fileNumber; i++) {
files.add(dataFileSer.deserialize(in));
}
- Path bucketPath;
- try {
- bucketPath =
- deserializeObject(
- deserializedBytes(in),
Thread.currentThread().getContextClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- return new Split(partition, bucket, files, bucketPath);
+ return new Split(partition, bucket, files, in.readBoolean());
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index 013c1b49..b8fa4061 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -19,14 +19,11 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.utils.RecordReader;
import java.io.IOException;
import java.util.Arrays;
-import java.util.List;
/** An abstraction layer above {@link FileStoreRead} to provide reading of
{@link RowData}. */
public interface TableRead {
@@ -41,8 +38,5 @@ public interface TableRead {
TableRead withProjection(int[][] projection);
- TableRead withIncremental(boolean isIncremental);
-
- RecordReader<RowData> createReader(
- BinaryRowData partition, int bucket, List<DataFileMeta> files)
throws IOException;
+ RecordReader<RowData> createReader(Split split) throws IOException;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 2ca147d9..f69ae473 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreScan;
@@ -44,6 +43,8 @@ public abstract class TableScan {
private final TableSchema tableSchema;
private final FileStorePathFactory pathFactory;
+ private boolean isIncremental = false;
+
protected TableScan(
FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory
pathFactory) {
this.scan = scan;
@@ -91,6 +92,7 @@ public abstract class TableScan {
}
public TableScan withIncremental(boolean isIncremental) {
+ this.isIncremental = isIncremental;
scan.withIncremental(isIncremental);
return this;
}
@@ -102,12 +104,12 @@ public abstract class TableScan {
private List<Split> generateSplits(
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
- return generateSplits(pathFactory, splitGenerator(pathFactory),
groupedDataFiles);
+ return generateSplits(isIncremental, splitGenerator(pathFactory),
groupedDataFiles);
}
@VisibleForTesting
public static List<Split> generateSplits(
- FileStorePathFactory pathFactory,
+ boolean isIncremental,
SplitGenerator splitGenerator,
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>
groupedDataFiles) {
List<Split> splits = new ArrayList<>();
@@ -117,10 +119,14 @@ public abstract class TableScan {
Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
int bucket = bucketEntry.getKey();
- Path bucketPath = pathFactory.bucketPath(partition, bucket);
- splitGenerator.split(bucketEntry.getValue()).stream()
- .map(files -> new Split(partition, bucket, files,
bucketPath))
- .forEach(splits::add);
+ if (isIncremental) {
+ // Don't split when incremental
+ splits.add(new Split(partition, bucket,
bucketEntry.getValue(), true));
+ } else {
+ splitGenerator.split(bucketEntry.getValue()).stream()
+ .map(files -> new Split(partition, bucket, files,
false))
+ .forEach(splits::add);
+ }
}
}
return splits;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
index 1d37cc0d..246a683a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIterator.java
@@ -19,13 +19,10 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.types.RowKind;
-import javax.annotation.Nullable;
-
import java.io.IOException;
/**
@@ -35,16 +32,11 @@ import java.io.IOException;
*/
public class ValueCountRowDataRecordIterator extends
ResetRowKindRecordIterator {
- private final @Nullable ProjectedRowData projectedRowData;
-
private RowData rowData;
private long count;
- public ValueCountRowDataRecordIterator(
- RecordReader.RecordIterator<KeyValue> kvIterator, @Nullable
int[][] projection) {
+ public
ValueCountRowDataRecordIterator(RecordReader.RecordIterator<KeyValue>
kvIterator) {
super(kvIterator);
- this.projectedRowData = projection == null ? null :
ProjectedRowData.from(projection);
-
this.rowData = null;
this.count = 0;
}
@@ -61,8 +53,7 @@ public class ValueCountRowDataRecordIterator extends
ResetRowKindRecordIterator
return null;
}
- rowData =
- projectedRowData == null ? kv.key() :
projectedRowData.replaceRow(kv.key());
+ rowData = kv.key();
long value = kv.value().getLong(0);
if (value < 0) {
rowData.setRowKind(RowKind.DELETE);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index bf3fce36..fb290140 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -43,6 +43,7 @@ import
org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
@@ -292,9 +293,11 @@ public class TestFileStore extends KeyValueFileStore {
RecordReaderIterator<KeyValue> iterator =
new RecordReaderIterator<>(
read.createReader(
- entryWithPartition.getKey(),
- entryWithBucket.getKey(),
- entryWithBucket.getValue()));
+ new Split(
+ entryWithPartition.getKey(),
+ entryWithBucket.getKey(),
+ entryWithBucket.getValue(),
+ false)));
while (iterator.hasNext()) {
kvs.add(iterator.next().copy(keySerializer,
valueSerializer));
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index 30866470..a2086ba8 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -30,6 +30,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -192,7 +193,6 @@ public class KeyValueFileStoreReadTest {
KeyValueFileStoreRead read = store.newRead();
if (keyProjection != null) {
read.withKeyProjection(keyProjection);
- read.withDropDelete(false);
}
if (valueProjection != null) {
read.withValueProjection(valueProjection);
@@ -203,11 +203,13 @@ public class KeyValueFileStoreReadTest {
filesGroupedByPartition.entrySet()) {
RecordReader<KeyValue> reader =
read.createReader(
- entry.getKey(),
- 0,
- entry.getValue().stream()
- .map(ManifestEntry::file)
- .collect(Collectors.toList()));
+ new Split(
+ entry.getKey(),
+ 0,
+ entry.getValue().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList()),
+ false));
RecordReaderIterator<KeyValue> actualIterator = new
RecordReaderIterator<>(reader);
while (actualIterator.hasNext()) {
result.add(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 92fc82ac..85579d09 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -98,7 +98,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
- TableRead read = table.newRead().withIncremental(true);
+ TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -111,7 +111,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
- TableRead read =
table.newRead().withIncremental(true).withProjection(PROJECTION);
+ TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("+101|11", "+102|12"));
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -127,7 +127,7 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate predicate = builder.equal(2, 101L);
List<Split> splits =
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
- TableRead read = table.newRead().withIncremental(true);
+ TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
@@ -175,7 +175,6 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
Collections.emptyList(),
conf.toMap(),
""));
- return new AppendOnlyFileStoreTable(
- tablePath.getName(), schemaManager, tableSchema, "user");
+ return new AppendOnlyFileStoreTable(tablePath, schemaManager,
tableSchema, "user");
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 6f2cccc1..fcc1a3f8 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -97,7 +97,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
FileStoreTable table = createFileStoreTable();
List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
- TableRead read = table.newRead().withIncremental(true);
+ TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -110,7 +110,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
FileStoreTable table = createFileStoreTable();
List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
- TableRead read =
table.newRead().withIncremental(true).withProjection(PROJECTION);
+ TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Arrays.asList("-100|10", "+101|11"));
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
@@ -126,7 +126,7 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
Predicate predicate = builder.equal(2, 201L);
List<Split> splits =
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
- TableRead read = table.newRead().withIncremental(true);
+ TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -178,7 +178,6 @@ public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBas
Collections.emptyList(),
conf.toMap(),
""));
- return new ChangelogValueCountFileStoreTable(
- tablePath.getName(), schemaManager, tableSchema, "user");
+ return new ChangelogValueCountFileStoreTable(tablePath, schemaManager,
tableSchema, "user");
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 8cdad460..6b9e7a49 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -97,7 +97,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
- TableRead read = table.newRead().withIncremental(true);
+ TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(Collections.singletonList("-1|11|1001"));
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
@@ -110,7 +110,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
FileStoreTable table = createFileStoreTable();
List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
- TableRead read =
table.newRead().withIncremental(true).withProjection(PROJECTION);
+ TableRead read = table.newRead().withProjection(PROJECTION);
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
.isEqualTo(Collections.singletonList("-1001|11"));
@@ -127,7 +127,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Predicate predicate = PredicateBuilder.and(builder.equal(2, 201L),
builder.equal(1, 21));
List<Split> splits =
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
- TableRead read = table.newRead().withIncremental(true);
+ TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -177,7 +177,6 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Arrays.asList("pt", "a"),
conf.toMap(),
""));
- return new ChangelogWithKeyFileStoreTable(
- tablePath.getName(), schemaManager, tableSchema, "user");
+ return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager,
tableSchema, "user");
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 0dd8171b..d36f6423 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -23,7 +23,8 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.sink.TableWrite;
@@ -103,8 +104,11 @@ public abstract class FileStoreTableTestBase {
int bucket,
Function<RowData, String> rowDataToString)
throws Exception {
- RecordReader<RowData> recordReader =
- read.createReader(partition, bucket, getFilesFor(splits,
partition, bucket));
+ List<ReaderSupplier<RowData>> readers = new ArrayList<>();
+ for (Split split : getSplitsFor(splits, partition, bucket)) {
+ readers.add(() -> read.createReader(split));
+ }
+ RecordReader<RowData> recordReader =
ConcatRecordReader.create(readers);
RecordReaderIterator<RowData> iterator = new
RecordReaderIterator<>(recordReader);
List<String> result = new ArrayList<>();
while (iterator.hasNext()) {
@@ -115,12 +119,11 @@ public abstract class FileStoreTableTestBase {
return result;
}
- private List<DataFileMeta> getFilesFor(
- List<Split> splits, BinaryRowData partition, int bucket) {
- List<DataFileMeta> result = new ArrayList<>();
+ private List<Split> getSplitsFor(List<Split> splits, BinaryRowData
partition, int bucket) {
+ List<Split> result = new ArrayList<>();
for (Split split : splits) {
if (split.partition().equals(partition) && split.bucket() ==
bucket) {
- result.addAll(split.files());
+ result.add(split);
}
}
return result;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 0e81baec..4c6d02fe 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -103,7 +103,6 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
Arrays.asList("pt", "a"),
conf.toMap(),
""));
- return new ChangelogWithKeyFileStoreTable(
- tablePath.getName(), schemaManager, schema, "user");
+ return new ChangelogWithKeyFileStoreTable(tablePath, schemaManager,
schema, "user");
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
index 9eb754e6..7540b7b5 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/SplitTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.table.source;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.store.file.data.DataFileMeta;
@@ -45,7 +44,7 @@ public class SplitTest {
for (int i = 0; i < ThreadLocalRandom.current().nextInt(10); i++) {
files.add(gen.next().meta);
}
- Split split = new Split(data.partition, data.bucket, files, new
Path("/tmp/test"));
+ Split split = new Split(data.partition, data.bucket, files, false);
ByteArrayOutputStream out = new ByteArrayOutputStream();
split.serialize(new DataOutputViewStreamWrapper(out));
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
index 0f6dd6f5..72a29253 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/ValueCountRowDataRecordIteratorTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.store.table.source;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.store.file.utils.ProjectKeyRecordReader;
import org.apache.flink.table.store.file.utils.ReusingTestData;
import org.apache.flink.types.RowKind;
@@ -48,7 +50,7 @@ public class ValueCountRowDataRecordIteratorTest extends
RowDataRecordIteratorTe
testIterator(
input,
- kvIterator -> new ValueCountRowDataRecordIterator(kvIterator,
null),
+ ValueCountRowDataRecordIterator::new,
(rowData, idx) -> {
assertThat(rowData.getArity()).isEqualTo(1);
assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
@@ -75,7 +77,10 @@ public class ValueCountRowDataRecordIteratorTest extends
RowDataRecordIteratorTe
input,
kvIterator ->
new ValueCountRowDataRecordIterator(
- kvIterator, new int[][] {new int[] {0}, new
int[] {0}}),
+ new ProjectKeyRecordReader.ProjectedIterator(
+ kvIterator,
+ ProjectedRowData.from(
+ new int[][] {new int[] {0},
new int[] {0}}))),
(rowData, idx) -> {
assertThat(rowData.getArity()).isEqualTo(2);
assertThat(rowData.getInt(0)).isEqualTo(expectedValues.get(idx));
diff --git
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 726ca878..98f502ac 100644
---
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -55,7 +55,7 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
TableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
return scan.plan().splits.stream()
- .map(TableStoreInputSplit::create)
+ .map(split ->
TableStoreInputSplit.create(table.location().toString(), split))
.toArray(TableStoreInputSplit[]::new);
}
@@ -65,9 +65,7 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
FileStoreTable table = createFileStoreTable(jobConf);
TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
long splitLength = split.getLength();
- return new TableStoreRecordReader(
- table.newRead().createReader(split.partition(),
split.bucket(), split.files()),
- splitLength);
+ return new
TableStoreRecordReader(table.newRead().createReader(split.split()),
splitLength);
}
private FileStoreTable createFileStoreTable(JobConf jobConf) {
diff --git
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index 4bbef384..2c041305 100644
---
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -18,24 +18,17 @@
package org.apache.flink.table.store.mapred;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
-import org.apache.flink.table.store.file.utils.SerializationUtils;
import org.apache.flink.table.store.table.source.Split;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.Objects;
/**
@@ -46,42 +39,28 @@ public class TableStoreInputSplit extends FileSplit {
private static final String[] ANYWHERE = new String[] {"*"};
- private BinaryRowData partition;
- private int bucket;
- private List<DataFileMeta> files;
- private String bucketPath;
+ private String path;
+ private Split split;
// public no-argument constructor for deserialization
public TableStoreInputSplit() {}
- public TableStoreInputSplit(
- BinaryRowData partition, int bucket, List<DataFileMeta> files,
String bucketPath) {
- this.partition = partition;
- this.bucket = bucket;
- this.files = files;
- this.bucketPath = bucketPath;
+ public TableStoreInputSplit(String path, Split split) {
+ this.path = path;
+ this.split = split;
}
- public static TableStoreInputSplit create(Split split) {
- return new TableStoreInputSplit(
- split.partition(), split.bucket(), split.files(),
split.bucketPath().toString());
+ public static TableStoreInputSplit create(String path, Split split) {
+ return new TableStoreInputSplit(path, split);
}
- public BinaryRowData partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public List<DataFileMeta> files() {
- return files;
+ public Split split() {
+ return split;
}
@Override
public Path getPath() {
- return new Path(bucketPath);
+ return new Path(path);
}
@Override
@@ -91,7 +70,7 @@ public class TableStoreInputSplit extends FileSplit {
@Override
public long getLength() {
- return files.stream().mapToLong(DataFileMeta::fileSize).sum();
+ return split.files().stream().mapToLong(DataFileMeta::fileSize).sum();
}
@Override
@@ -101,63 +80,41 @@ public class TableStoreInputSplit extends FileSplit {
@Override
public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeInt(bucket);
-
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
- DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(bos);
- SerializationUtils.serializeBinaryRow(partition, view);
- DataFileMetaSerializer metaSerializer = new
DataFileMetaSerializer();
- metaSerializer.serializeList(files, view);
- byte[] bytes = bos.toByteArray();
- dataOutput.writeInt(bytes.length);
- dataOutput.write(bytes);
- }
-
- byte[] bucketPathBytes = bucketPath.getBytes();
- dataOutput.writeInt(bucketPathBytes.length);
- dataOutput.write(bucketPathBytes);
+ dataOutput.writeUTF(path);
+ DataOutputSerializer out = new DataOutputSerializer(128);
+ split.serialize(out);
+ dataOutput.writeInt(out.length());
+ dataOutput.write(out.getCopyOfBuffer());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
- bucket = dataInput.readInt();
-
- byte[] fieldBytes = new byte[dataInput.readInt()];
- dataInput.readFully(fieldBytes);
- try (ByteArrayInputStream bis = new ByteArrayInputStream(fieldBytes)) {
- DataInputViewStreamWrapper view = new
DataInputViewStreamWrapper(bis);
- partition = SerializationUtils.deserializeBinaryRow(view);
- DataFileMetaSerializer metaSerializer = new
DataFileMetaSerializer();
- files = metaSerializer.deserializeList(view);
- }
-
- byte[] bucketPathBytes = new byte[dataInput.readInt()];
- dataInput.readFully(bucketPathBytes);
- bucketPath = new String(bucketPathBytes);
+ path = dataInput.readUTF();
+ int length = dataInput.readInt();
+ byte[] bytes = new byte[length];
+ dataInput.readFully(bytes);
+ split = Split.deserialize(new DataInputDeserializer(bytes));
}
@Override
public String toString() {
- return "{"
- + String.join(
- ", ",
- Arrays.asList(
- "partition: " + partition.toString(),
- "bucket: " + bucket,
- "files: " + files.toString(),
- "bucketPath: " + bucketPath))
- + "}";
+ return "{" + "path='" + path + '\'' + ", split=" + split + '}';
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof TableStoreInputSplit)) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
}
TableStoreInputSplit that = (TableStoreInputSplit) o;
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && Objects.equals(files, that.files)
- && Objects.equals(bucketPath, that.bucketPath);
+ return Objects.equals(path, that.path) && Objects.equals(split,
that.split);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, split);
}
}
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
index 90e25fcf..4321abd7 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.mapred;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileTestDataGenerator;
+import org.apache.flink.table.store.table.source.Split;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -52,13 +53,15 @@ public class TableStoreInputSplitTest {
BinaryRowData wantedPartition = generated.get(0).partition;
TableStoreInputSplit split =
new TableStoreInputSplit(
- wantedPartition,
- 0,
- generated.stream()
- .filter(d ->
d.partition.equals(wantedPartition))
- .map(d -> d.meta)
- .collect(Collectors.toList()),
- tempDir.toString());
+ tempDir.toString(),
+ new Split(
+ wantedPartition,
+ 0,
+ generated.stream()
+ .filter(d ->
d.partition.equals(wantedPartition))
+ .map(d -> d.meta)
+ .collect(Collectors.toList()),
+ false));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 064a239e..6befdfff 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -142,7 +142,8 @@ public class TableStoreRecordReaderTest {
for (Split split : table.newScan().plan().splits) {
if (split.partition().equals(partition) && split.bucket() ==
bucket) {
return Tuple2.of(
- table.newRead().createReader(partition, bucket,
split.files()),
+ table.newRead()
+ .createReader(new Split(partition, bucket,
split.files(), false)),
split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
}
}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
index 2e792aff..f247f908 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkReaderFactory.java
@@ -21,7 +21,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
@@ -52,13 +51,12 @@ public class SparkReaderFactory implements
PartitionReaderFactory {
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition)
{
- Split split = ((SparkInputPartition) partition).split();
RecordReader<RowData> reader;
try {
reader =
table.newRead()
.withProjection(projectedFields)
- .createReader(split.partition(), split.bucket(),
split.files());
+ .createReader(((SparkInputPartition)
partition).split());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
index f7425953..564ca215 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkScan.java
@@ -57,7 +57,7 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public String description() {
// TODO add filters
- return String.format("tablestore(%s)", table.name());
+ return String.format("tablestore(%s)", table.location().getName());
}
@Override
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index 0d1648ae..c5ec1a18 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -47,7 +47,7 @@ public class SparkTable implements
org.apache.spark.sql.connector.catalog.Table,
@Override
public String name() {
- return table.name();
+ return table.location().getName();
}
@Override