This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 2b9c186e [FLINK-27973] Introduce TableWrite and TableCommit as an
abstraction layer above FileStore for writing RowData
2b9c186e is described below
commit 2b9c186e771db18af08bac2a6a2f8cd6f1a084ae
Author: tsreaper <[email protected]>
AuthorDate: Fri Jun 10 10:56:38 2022 +0800
[FLINK-27973] Introduce TableWrite and TableCommit as an abstraction layer
above FileStore for writing RowData
This closes #150
---
.../connector/sink/BucketStreamPartitioner.java | 2 +-
.../connector/sink/CommittableSerializer.java | 1 +
.../connector/sink/FileCommittableSerializer.java | 1 +
.../store/connector/sink/StoreGlobalCommitter.java | 1 +
.../table/store/connector/sink/StoreSink.java | 4 +-
.../store/connector/sink/StoreSinkWriter.java | 5 +-
.../connector/sink/CommittableSerializerTest.java | 1 +
.../sink/FileCommittableSerializerTest.java | 1 +
.../flink/table/store/file/FileStoreImpl.java | 4 +
.../flink/table/store/file/schema/Schema.java | 9 +-
.../flink/table/store/log/LogSinkProvider.java | 2 +-
.../table/store/table/AbstractFileStoreTable.java | 9 +
.../store/table/AppendOnlyFileStoreTable.java | 42 ++++-
.../table/ChangelogValueCountFileStoreTable.java | 55 ++++--
.../table/ChangelogWithKeyFileStoreTable.java | 78 +++++++--
.../flink/table/store/table/FileStoreTable.java | 14 +-
.../table/store/table}/sink/FileCommittable.java | 2 +-
.../table/store/{ => table}/sink/SinkRecord.java | 2 +-
.../{ => table}/sink/SinkRecordConverter.java | 12 +-
.../flink/table/store/table/sink/TableCommit.java | 81 +++++++++
.../flink/table/store/table/sink/TableWrite.java | 133 ++++++++++++++
.../flink/table/store/table/source/TableRead.java | 2 +
.../flink/table/store/table/source/TableScan.java | 23 ++-
.../store/table/AppendOnlyFileStoreTableTest.java | 187 ++++++++++++++++++++
.../ChangelogValueCountFileStoreTableTest.java | 184 +++++++++++++++++++
.../table/ChangelogWithKeyFileStoreTableTest.java | 194 +++++++++++++++++++++
.../table/store/table/FileStoreTableTestBase.java | 138 +++++++++++++++
.../table/store/mapred/TableStoreInputFormat.java | 4 +-
.../flink/table/store/FileStoreTestHelper.java | 141 ---------------
.../flink/table/store/FileStoreTestUtils.java | 53 ++++++
.../hive/TableStoreHiveStorageHandlerITCase.java | 191 ++++++++------------
.../store/mapred/TableStoreRecordReaderTest.java | 111 +++++-------
.../store/kafka/KafkaLogSerializationSchema.java | 2 +-
.../table/store/kafka/KafkaLogSinkProvider.java | 2 +-
.../store/kafka/KafkaLogSerializationTest.java | 2 +-
.../flink/table/store/kafka/KafkaLogTestUtils.java | 2 +-
.../table/store/kafka/TestOffsetsLogSink.java | 2 +-
37 files changed, 1309 insertions(+), 388 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index dedf5d88..c663b4c3 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -23,7 +23,7 @@ import
org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.types.logical.RowType;
/** A {@link StreamPartitioner} to partition records by bucket. */
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index 95938625..9f6dc1d1 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector.sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.store.table.sink.FileCommittable;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
index ede31b28..3bd03110 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.table.sink.FileCommittable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index 20e405e8..f02e9eb1 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -23,6 +23,7 @@ import
org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreExpire;
+import org.apache.flink.table.store.table.sink.FileCommittable;
import javax.annotation.Nullable;
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index a7ada476..5a3d498c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -36,8 +36,8 @@ import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.log.LogInitContext;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogWriteCallback;
-import org.apache.flink.table.store.sink.SinkRecord;
-import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import javax.annotation.Nullable;
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 5bdb37f5..9a914636 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -31,8 +31,9 @@ import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.log.LogWriteCallback;
-import org.apache.flink.table.store.sink.SinkRecord;
-import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index c451c18a..98db8f5b 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.table.sink.FileCommittable;
import org.junit.jupiter.api.Test;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
index 9bf8e996..73b66f03 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector.sink;
import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.table.sink.FileCommittable;
import org.junit.jupiter.api.Test;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index b4d2a640..ea196f4e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -88,6 +88,10 @@ public class FileStoreImpl implements FileStore {
options.fileFormat().getFormatIdentifier());
}
+ public FileStoreOptions options() {
+ return options;
+ }
+
@VisibleForTesting
public ManifestFile.Factory manifestFileFactory() {
return new ManifestFile.Factory(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
index 6b13cf17..2afc641f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/Schema.java
@@ -143,8 +143,13 @@ public class Schema implements Serializable {
return projectedLogicalRowType(partitionKeys);
}
- public RowType logicalPrimaryKeysType() {
- return projectedLogicalRowType(primaryKeys);
+ public RowType logicalTrimmedPrimaryKeysType() {
+ return projectedLogicalRowType(trimmedPrimaryKeys());
+ }
+
+ public int[] projection(List<String> projectedFieldNames) {
+ List<String> fieldNames = fieldNames();
+ return
projectedFieldNames.stream().mapToInt(fieldNames::indexOf).toArray();
}
private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
index 8a091fa2..effa38e8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.log;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
import java.io.Serializable;
import java.util.function.Consumer;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 39b1887d..6d52c0b3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,7 +18,9 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.types.logical.RowType;
/** Abstract {@link FileStoreTable}. */
@@ -43,4 +45,11 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
public RowType rowType() {
return schema.logicalRowType();
}
+
+ @Override
+ public TableCommit newCommit() {
+ return new TableCommit(store().newCommit(), store().newExpire());
+ }
+
+ protected abstract FileStore store();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 1c0463a7..70e31059 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -19,31 +19,36 @@
package org.apache.flink.table.store.table;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
/** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
- private final Schema schema;
private final FileStoreImpl store;
AppendOnlyFileStoreTable(String name, Schema schema, String user) {
super(name, schema);
- this.schema = schema;
this.store =
new FileStoreImpl(
schema.id(),
@@ -57,9 +62,8 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableScan newScan(boolean incremental) {
- FileStoreScan scan = store.newScan().withIncremental(incremental);
- return new TableScan(scan, schema, store.pathFactory()) {
+ public TableScan newScan() {
+ return new TableScan(store.newScan(), schema, store.pathFactory()) {
@Override
protected void withNonPartitionFilter(Predicate predicate) {
scan.withValueFilter(predicate);
@@ -68,7 +72,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableRead newRead(boolean incremental) {
+ public TableRead newRead() {
return new TableRead(store.newRead()) {
@Override
public TableRead withProjection(int[][] projection) {
@@ -76,6 +80,11 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
return this;
}
+ @Override
+ public TableRead withIncremental(boolean isIncremental) {
+ return this;
+ }
+
@Override
protected RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
@@ -85,7 +94,24 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public FileStore fileStore() {
+ public TableWrite newWrite() {
+ SinkRecordConverter recordConverter =
+ new SinkRecordConverter(store.options().bucket(), schema);
+ return new TableWrite(store.newWrite(), recordConverter) {
+ @Override
+ protected void writeSinkRecord(SinkRecord record, RecordWriter
writer)
+ throws Exception {
+ Preconditions.checkState(
+ record.row().getRowKind() == RowKind.INSERT,
+ "Append only writer can not accept row with RowKind
%s",
+ record.row().getRowKind());
+ writer.write(ValueKind.ADD, BinaryRowDataUtil.EMPTY_ROW,
record.row());
+ }
+ };
+ }
+
+ @Override
+ public FileStore store() {
return store;
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 66d8b18b..071e8f56 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -18,19 +18,23 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -64,9 +68,8 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableScan newScan(boolean incremental) {
- FileStoreScan scan = store.newScan().withIncremental(incremental);
- return new TableScan(scan, schema, store.pathFactory()) {
+ public TableScan newScan() {
+ return new TableScan(store.newScan(), schema, store.pathFactory()) {
@Override
protected void withNonPartitionFilter(Predicate predicate) {
scan.withKeyFilter(predicate);
@@ -75,14 +78,14 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableRead newRead(boolean incremental) {
- FileStoreRead read = store.newRead().withDropDelete(!incremental);
- return new TableRead(read) {
+ public TableRead newRead() {
+ return new TableRead(store.newRead()) {
private int[][] projection = null;
+ private boolean isIncremental = false;
@Override
public TableRead withProjection(int[][] projection) {
- if (incremental) {
+ if (isIncremental) {
read.withKeyProjection(projection);
} else {
this.projection = projection;
@@ -90,6 +93,13 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
return this;
}
+ @Override
+ public TableRead withIncremental(boolean isIncremental) {
+ this.isIncremental = isIncremental;
+ read.withDropDelete(!isIncremental);
+ return this;
+ }
+
@Override
protected RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
@@ -99,7 +109,32 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public FileStore fileStore() {
+ public TableWrite newWrite() {
+ SinkRecordConverter recordConverter =
+ new SinkRecordConverter(store.options().bucket(), schema);
+ return new TableWrite(store.newWrite(), recordConverter) {
+ @Override
+ protected void writeSinkRecord(SinkRecord record, RecordWriter
writer)
+ throws Exception {
+ switch (record.row().getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ writer.write(ValueKind.ADD, record.row(),
GenericRowData.of(1L));
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ writer.write(ValueKind.ADD, record.row(),
GenericRowData.of(-1L));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown row kind " +
record.row().getRowKind());
+ }
+ }
+ };
+ }
+
+ @Override
+ public FileStore store() {
return store;
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index af7ae532..fb56416b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -24,22 +24,28 @@ import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecordConverter;
+import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with
primary keys. */
@@ -56,7 +62,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
// add _KEY_ prefix to avoid conflict with value
RowType keyType =
new RowType(
- schema.logicalPrimaryKeysType().getFields().stream()
+
schema.logicalTrimmedPrimaryKeysType().getFields().stream()
.map(
f ->
new RowType.RowField(
@@ -98,26 +104,51 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public TableScan newScan(boolean incremental) {
- FileStoreScan scan = store.newScan().withIncremental(incremental);
- return new TableScan(scan, schema, store.pathFactory()) {
+ public TableScan newScan() {
+ return new TableScan(store.newScan(), schema, store.pathFactory()) {
@Override
protected void withNonPartitionFilter(Predicate predicate) {
- scan.withValueFilter(predicate);
+ // currently we can only perform filter push down on keys
+ // consider this case:
+ // data file 1: insert key = a, value = 1
+ // data file 2: update key = a, value = 2
+ // filter: value = 1
+ // if we perform filter push down on values, data file 1 will
be chosen, but data
+ // file 2 will be ignored, and the final result will be key =
a, value = 1 while the
+ // correct result is an empty set
+ // TODO support value filter
+ List<String> trimmedPrimaryKeys = schema.trimmedPrimaryKeys();
+ int[] fieldIdxToKeyIdx =
+ schema.fields().stream()
+ .mapToInt(f ->
trimmedPrimaryKeys.indexOf(f.name()))
+ .toArray();
+ List<Predicate> keyFilters = new ArrayList<>();
+ for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+ Optional<Predicate> mapped = mapFilterFields(p,
fieldIdxToKeyIdx);
+ mapped.ifPresent(keyFilters::add);
+ }
+ if (keyFilters.size() > 0) {
+ scan.withKeyFilter(PredicateBuilder.and(keyFilters));
+ }
}
};
}
@Override
- public TableRead newRead(boolean incremental) {
- FileStoreRead read = store.newRead().withDropDelete(!incremental);
- return new TableRead(read) {
+ public TableRead newRead() {
+ return new TableRead(store.newRead()) {
@Override
public TableRead withProjection(int[][] projection) {
read.withValueProjection(projection);
return this;
}
+ @Override
+ public TableRead withIncremental(boolean isIncremental) {
+ read.withDropDelete(!isIncremental);
+ return this;
+ }
+
@Override
protected RecordReader.RecordIterator<RowData>
rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
@@ -127,7 +158,32 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public FileStore fileStore() {
+ public TableWrite newWrite() {
+ SinkRecordConverter recordConverter =
+ new SinkRecordConverter(store.options().bucket(), schema);
+ return new TableWrite(store.newWrite(), recordConverter) {
+ @Override
+ protected void writeSinkRecord(SinkRecord record, RecordWriter
writer)
+ throws Exception {
+ switch (record.row().getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ writer.write(ValueKind.ADD, record.primaryKey(),
record.row());
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ writer.write(ValueKind.DELETE, record.primaryKey(),
record.row());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown row kind " +
record.row().getRowKind());
+ }
+ }
+ };
+ }
+
+ @Override
+ public FileStore store() {
return store;
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 0c9d148b..65645f76 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,8 +18,8 @@
package org.apache.flink.table.store.table;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.types.logical.RowType;
@@ -36,11 +36,11 @@ public interface FileStoreTable extends Serializable {
RowType rowType();
- TableScan newScan(boolean incremental);
+ TableScan newScan();
- TableRead newRead(boolean incremental);
+ TableRead newRead();
- // TODO remove this once TableWrite is introduced
- @VisibleForTesting
- FileStore fileStore();
+ TableWrite newWrite();
+
+ TableCommit newCommit();
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
similarity index 97%
rename from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
rename to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
index e088d1d3..e2a78286 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/FileCommittable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.connector.sink;
+package org.apache.flink.table.store.table.sink;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.mergetree.Increment;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecord.java
similarity index 97%
rename from
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
rename to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecord.java
index 4fccccae..e43f74f9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecord.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.sink;
+package org.apache.flink.table.store.table.sink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
similarity index 89%
rename from
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
rename to
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 112f7726..3a2aed45 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -16,12 +16,13 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.sink;
+package org.apache.flink.table.store.table.sink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@@ -43,6 +44,15 @@ public class SinkRecordConverter {
@Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
+ public SinkRecordConverter(int numBucket, Schema schema) {
+ this(
+ numBucket,
+ schema.logicalRowType(),
+ schema.projection(schema.partitionKeys()),
+ schema.projection(schema.trimmedPrimaryKeys()),
+ schema.projection(schema.primaryKeys()));
+ }
+
public SinkRecordConverter(
int numBucket,
RowType inputType,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
new file mode 100644
index 00000000..b8f7da95
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstraction layer above {@link FileStoreCommit} and {@link
FileStoreExpire} to provide
+ * snapshot commit and expiration.
+ */
+public class TableCommit {
+
+ private final FileStoreCommit commit;
+ private final FileStoreExpire expire;
+
+ @Nullable private Map<String, String> overwritePartition = null;
+
+ public TableCommit(FileStoreCommit commit, FileStoreExpire expire) {
+ this.commit = commit;
+ this.expire = expire;
+ }
+
+ public TableCommit withOverwritePartition(@Nullable Map<String, String>
overwritePartition) {
+ this.overwritePartition = overwritePartition;
+ return this;
+ }
+
+ public List<ManifestCommittable> filterRecoveredCommittables(
+ List<ManifestCommittable> committables) {
+ return commit.filterCommitted(committables);
+ }
+
+ public void commit(String identifier, List<FileCommittable>
fileCommittables) {
+ ManifestCommittable committable = new ManifestCommittable(identifier);
+ for (FileCommittable fileCommittable : fileCommittables) {
+ committable.addFileCommittable(
+ fileCommittable.partition(),
+ fileCommittable.bucket(),
+ fileCommittable.increment());
+ }
+ commit(Collections.singletonList(committable));
+ }
+
+ public void commit(List<ManifestCommittable> committables) {
+ if (overwritePartition == null) {
+ for (ManifestCommittable committable : committables) {
+ commit.commit(committable, new HashMap<>());
+ }
+ } else {
+ for (ManifestCommittable committable : committables) {
+ commit.overwrite(overwritePartition, committable, new
HashMap<>());
+ }
+ }
+ expire.expire();
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
new file mode 100644
index 00000000..c2502be2
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** An abstraction layer above {@link FileStoreWrite} to provide {@link
RowData} writing. */
+public abstract class TableWrite {
+
+ private final FileStoreWrite write;
+ private final SinkRecordConverter recordConverter;
+
+ private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
+ private final ExecutorService compactExecutor;
+
+ private boolean overwrite = false;
+
+ protected TableWrite(FileStoreWrite write, SinkRecordConverter
recordConverter) {
+ this.write = write;
+ this.recordConverter = recordConverter;
+
+ this.writers = new HashMap<>();
+ this.compactExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("compaction-thread"));
+ }
+
+ public TableWrite withOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ return this;
+ }
+
+ public void write(RowData rowData) throws Exception {
+ SinkRecord record = recordConverter.convert(rowData);
+ RecordWriter writer = getWriter(record.partition(), record.bucket());
+ writeSinkRecord(record, writer);
+ }
+
+ public List<FileCommittable> prepareCommit() throws Exception {
+ List<FileCommittable> result = new ArrayList<>();
+
+ Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>>
partIter =
+ writers.entrySet().iterator();
+ while (partIter.hasNext()) {
+ Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry =
partIter.next();
+ BinaryRowData partition = partEntry.getKey();
+ Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
+ partEntry.getValue().entrySet().iterator();
+ while (bucketIter.hasNext()) {
+ Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
+ int bucket = entry.getKey();
+ RecordWriter writer = entry.getValue();
+ FileCommittable committable =
+ new FileCommittable(partition, bucket,
writer.prepareCommit());
+ result.add(committable);
+
+ // clear if no update
+ // we need a mechanism to clear writers, otherwise there will
be more and more
+ // such as yesterday's partition that no longer needs to be
written.
+ if (committable.increment().newFiles().isEmpty()) {
+ closeWriter(writer);
+ bucketIter.remove();
+ }
+ }
+
+ if (partEntry.getValue().isEmpty()) {
+ partIter.remove();
+ }
+ }
+
+ return result;
+ }
+
+ private void closeWriter(RecordWriter writer) throws Exception {
+ writer.sync();
+ writer.close();
+ }
+
+ public void close() throws Exception {
+ compactExecutor.shutdownNow();
+ for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
+ for (RecordWriter writer : bucketWriters.values()) {
+ closeWriter(writer);
+ }
+ }
+ writers.clear();
+ }
+
+ protected abstract void writeSinkRecord(SinkRecord record, RecordWriter
writer)
+ throws Exception;
+
+ private RecordWriter getWriter(BinaryRowData partition, int bucket) {
+ Map<Integer, RecordWriter> buckets = writers.get(partition);
+ if (buckets == null) {
+ buckets = new HashMap<>();
+ writers.put(partition.copy(), buckets);
+ }
+ return buckets.computeIfAbsent(
+ bucket,
+ k ->
+ overwrite
+ ? write.createEmptyWriter(partition.copy(),
bucket, compactExecutor)
+ : write.createWriter(partition.copy(), bucket,
compactExecutor));
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index be694bbc..53ff22e6 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -50,6 +50,8 @@ public abstract class TableRead {
public abstract TableRead withProjection(int[][] projection);
+ public abstract TableRead withIncremental(boolean isIncremental);
+
public RecordReader<RowData> createReader(
BinaryRowData partition, int bucket, List<DataFileMeta> files)
throws IOException {
return new RowDataRecordReader(read.createReader(partition, bucket,
files));
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 665a8053..7689902d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.predicate.And;
import org.apache.flink.table.store.file.predicate.CompoundPredicate;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
import org.apache.flink.table.store.file.predicate.Predicate;
@@ -64,7 +63,7 @@ public abstract class TableScan {
List<Predicate> partitionFilters = new ArrayList<>();
List<Predicate> nonPartitionFilters = new ArrayList<>();
for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
- Optional<Predicate> mapped = mapToPartitionFilter(p,
fieldIdxToPartitionIdx);
+ Optional<Predicate> mapped = mapFilterFields(p,
fieldIdxToPartitionIdx);
if (mapped.isPresent()) {
partitionFilters.add(mapped.get());
} else {
@@ -72,8 +71,17 @@ public abstract class TableScan {
}
}
- scan.withPartitionFilter(new CompoundPredicate(And.INSTANCE,
partitionFilters));
- withNonPartitionFilter(new CompoundPredicate(And.INSTANCE,
nonPartitionFilters));
+ if (partitionFilters.size() > 0) {
+ scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
+ }
+ if (nonPartitionFilters.size() > 0) {
+ withNonPartitionFilter(PredicateBuilder.and(nonPartitionFilters));
+ }
+ return this;
+ }
+
+ public TableScan withIncremental(boolean isIncremental) {
+ scan.withIncremental(isIncremental);
return this;
}
@@ -86,13 +94,12 @@ public abstract class TableScan {
protected abstract void withNonPartitionFilter(Predicate predicate);
- private Optional<Predicate> mapToPartitionFilter(
- Predicate predicate, int[] fieldIdxToPartitionIdx) {
+ protected Optional<Predicate> mapFilterFields(Predicate predicate, int[]
fieldIdxMapping) {
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
List<Predicate> children = new ArrayList<>();
for (Predicate child : compoundPredicate.children()) {
- Optional<Predicate> mapped = mapToPartitionFilter(child,
fieldIdxToPartitionIdx);
+ Optional<Predicate> mapped = mapFilterFields(child,
fieldIdxMapping);
if (mapped.isPresent()) {
children.add(mapped.get());
} else {
@@ -102,7 +109,7 @@ public abstract class TableScan {
return Optional.of(new
CompoundPredicate(compoundPredicate.function(), children));
} else {
LeafPredicate leafPredicate = (LeafPredicate) predicate;
- int mapped = fieldIdxToPartitionIdx[leafPredicate.index()];
+ int mapped = fieldIdxMapping[leafPredicate.index()];
if (mapped >= 0) {
return Optional.of(
new LeafPredicate(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
new file mode 100644
index 00000000..b69ebb1f
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AppendOnlyFileStoreTable}. */
+// TODO enable this test class after append only file store with avro format
is fixed
+@Disabled
+public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testBatchReadWrite() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList("1|10|100", "1|11|101", "1|12|102",
"1|11|101", "1|12|102"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202",
"2|21|201"));
+ }
+
+ @Test
+ public void testBatchProjection() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead().withProjection(PROJECTION);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_PROJECTED_ROW_TO_STRING))
+ .isEqualTo(Arrays.asList("100|10", "101|11", "102|12",
"101|11", "102|12"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_PROJECTED_ROW_TO_STRING))
+ .isEqualTo(Arrays.asList("200|20", "201|21", "202|22",
"201|21"));
+ }
+
+ @Test
+ public void testBatchFilter() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ Predicate predicate =
+ PredicateBuilder.equal(
+ 2,
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+ List<Split> splits =
table.newScan().withFilter(predicate).plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList(
+ "2|21|201",
+ // this record is in the same file with the
first "2|21|201"
+ "2|22|202",
+ "2|21|201"));
+ }
+
+ @Test
+ public void testStreamingReadWrite() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ TableRead read = table.newRead().withIncremental(true);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
+ .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("+2|21|201"));
+ }
+
+ @Test
+ public void testStreamingProjection() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ TableRead read =
table.newRead().withIncremental(true).withProjection(PROJECTION);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+101|11", "+102|12"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("+201|21"));
+ }
+
+ @Test
+ public void testStreamingFilter() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ Predicate predicate =
+ PredicateBuilder.equal(
+ 2,
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 101L));
+ List<Split> splits =
+
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+ TableRead read = table.newRead().withIncremental(true);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList(
+ "+1|11|101",
+ // this record is in the same file with
"+1|11|101"
+ "+1|12|102"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
+ }
+
+ private void writeData() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableWrite write = table.newWrite();
+
+ write.write(GenericRowData.of(1, 10, 100L));
+ write.write(GenericRowData.of(2, 20, 200L));
+ write.write(GenericRowData.of(1, 11, 101L));
+ table.newCommit().commit("0", write.prepareCommit());
+
+ write.write(GenericRowData.of(1, 12, 102L));
+ write.write(GenericRowData.of(2, 21, 201L));
+ write.write(GenericRowData.of(2, 22, 202L));
+ table.newCommit().commit("1", write.prepareCommit());
+
+ write.write(GenericRowData.of(1, 11, 101L));
+ write.write(GenericRowData.of(2, 21, 201L));
+ write.write(GenericRowData.of(1, 12, 102L));
+ table.newCommit().commit("2", write.prepareCommit());
+
+ write.close();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Path tablePath = new Path(tempDir.toString());
+ Configuration conf = new Configuration();
+ conf.set(FileStoreOptions.PATH, tablePath.toString());
+ conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.set(FileStoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ Schema schema =
+ new SchemaManager(tablePath)
+ .commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new AppendOnlyFileStoreTable(tablePath.getName(), schema,
"user");
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
new file mode 100644
index 00000000..bed0cad7
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogValueCountFileStoreTable}. */
+public class ChangelogValueCountFileStoreTableTest extends
FileStoreTableTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testBatchReadWrite() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("1|11|101", "1|12|102",
"1|11|101"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201",
"2|22|202"));
+ }
+
+ @Test
+ public void testBatchProjection() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead().withProjection(PROJECTION);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("101|11", "102|12",
"101|11"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("200|20", "201|21",
"202|22"));
+ }
+
+ @Test
+ public void testBatchFilter() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ Predicate predicate =
+ PredicateBuilder.equal(
+ 2,
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+ List<Split> splits =
table.newScan().withFilter(predicate).plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "2|21|201",
+ // this record is in the same file with
"delete 2|21|201"
+ "2|22|202"));
+ }
+
+ @Test
+ public void testStreamingReadWrite() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ TableRead read = table.newRead().withIncremental(true);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+1|11|101", "-1|10|100"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+2|22|202", "-2|21|201"));
+ }
+
+ @Test
+ public void testStreamingProjection() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ TableRead read =
table.newRead().withIncremental(true).withProjection(PROJECTION);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+101|11", "-100|10"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+202|22", "-201|21"));
+ }
+
+ @Test
+ public void testStreamingFilter() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ Predicate predicate =
+ PredicateBuilder.equal(
+ 2,
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L));
+ List<Split> splits =
+
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+ TableRead read = table.newRead().withIncremental(true);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+ "-2|21|201",
+ // this record is in the same file with
"delete 2|21|201"
+ "+2|22|202"));
+ }
+
+ private void writeData() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableWrite write = table.newWrite();
+
+ write.write(GenericRowData.of(1, 10, 100L));
+ write.write(GenericRowData.of(2, 20, 200L));
+ write.write(GenericRowData.of(1, 11, 101L));
+ table.newCommit().commit("0", write.prepareCommit());
+
+ write.write(GenericRowData.of(1, 12, 102L));
+ write.write(GenericRowData.of(2, 21, 201L));
+ write.write(GenericRowData.of(2, 21, 201L));
+ table.newCommit().commit("1", write.prepareCommit());
+
+ write.write(GenericRowData.of(1, 11, 101L));
+ write.write(GenericRowData.of(2, 22, 202L));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
+ table.newCommit().commit("2", write.prepareCommit());
+
+ write.close();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Path tablePath = new Path(tempDir.toString());
+ Configuration conf = new Configuration();
+ conf.set(FileStoreOptions.PATH, tablePath.toString());
+ conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ Schema schema =
+ new SchemaManager(tablePath)
+ .commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new ChangelogValueCountFileStoreTable(tablePath.getName(),
schema, "user");
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
new file mode 100644
index 00000000..e575651b
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
+public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase
{
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testBatchReadWrite() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("1|10|1000"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("2|21|20001", "2|22|202"));
+ }
+
+ @Test
+ public void testBatchProjection() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead().withProjection(PROJECTION);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("1000|10"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("20001|21", "202|22"));
+ }
+
+ @Test
+ public void testBatchFilter() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.equal(
+ 2,
+
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
+ PredicateBuilder.equal(
+ 1,
Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+ List<Split> splits =
table.newScan().withFilter(predicate).plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+ // only filter on key should be performed,
+ // and records from the same file should also
be selected
+ "2|21|20001", "2|22|202"));
+ }
+
+ @Test
+ public void testStreamingReadWrite() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ TableRead read = table.newRead().withIncremental(true);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("-1|11|1001"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+2|21|20001", "+2|22|202",
"-2|20|200"));
+ }
+
+ @Test
+ public void testStreamingProjection() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ List<Split> splits =
table.newScan().withIncremental(true).plan().splits;
+ TableRead read =
table.newRead().withIncremental(true).withProjection(PROJECTION);
+
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("-1001|11"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_PROJECTED_ROW_TO_STRING))
+ .hasSameElementsAs(Arrays.asList("+20001|21", "+202|22",
"-200|20"));
+ }
+
+ @Test
+ public void testStreamingFilter() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.equal(
+ 2,
+
Literal.fromJavaObject(DataTypes.BIGINT().getLogicalType(), 201L)),
+ PredicateBuilder.equal(
+ 1,
Literal.fromJavaObject(DataTypes.INT().getLogicalType(), 21)));
+ List<Split> splits =
+
table.newScan().withIncremental(true).withFilter(predicate).plan().splits;
+ TableRead read = table.newRead().withIncremental(true);
+ assertThat(getResult(read, splits, binaryRow(1), 0,
STREAMING_ROW_TO_STRING)).isEmpty();
+ assertThat(getResult(read, splits, binaryRow(2), 0,
STREAMING_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+ // only filter on key should be performed,
+ // and records from the same file should also
be selected
+ "+2|21|20001", "+2|22|202", "-2|20|200"));
+ }
+
+ private void writeData() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableWrite write = table.newWrite();
+
+ write.write(GenericRowData.of(1, 10, 100L));
+ write.write(GenericRowData.of(2, 20, 200L));
+ write.write(GenericRowData.of(1, 11, 101L));
+ table.newCommit().commit("0", write.prepareCommit());
+
+ write.write(GenericRowData.of(1, 10, 1000L));
+ write.write(GenericRowData.of(2, 21, 201L));
+ write.write(GenericRowData.of(2, 21, 2001L));
+ table.newCommit().commit("1", write.prepareCommit());
+
+ write.write(GenericRowData.of(1, 11, 1001L));
+ write.write(GenericRowData.of(2, 21, 20001L));
+ write.write(GenericRowData.of(2, 22, 202L));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
+ table.newCommit().commit("2", write.prepareCommit());
+
+ write.close();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Path tablePath = new Path(tempDir.toString());
+ Configuration conf = new Configuration();
+ conf.set(FileStoreOptions.PATH, tablePath.toString());
+ conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ Schema schema =
+ new SchemaManager(tablePath)
+ .commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ conf.toMap(),
+ ""));
+ return new ChangelogWithKeyFileStoreTable(tablePath.getName(), schema,
"user");
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
new file mode 100644
index 00000000..0dd8171b
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for {@link FileStoreTable}. */
+public abstract class FileStoreTableTestBase {
+
+ protected static final RowType ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"pt", "a", "b"});
+ protected static final int[] PROJECTION = new int[] {2, 1};
+ protected static final Function<RowData, String> BATCH_ROW_TO_STRING =
+ rowData -> rowData.getInt(0) + "|" + rowData.getInt(1) + "|" +
rowData.getLong(2);
+ protected static final Function<RowData, String>
BATCH_PROJECTED_ROW_TO_STRING =
+ rowData -> rowData.getLong(0) + "|" + rowData.getInt(1);
+ protected static final Function<RowData, String> STREAMING_ROW_TO_STRING =
+ rowData ->
+ (rowData.getRowKind() == RowKind.INSERT ? "+" : "-")
+ + BATCH_ROW_TO_STRING.apply(rowData);
+ protected static final Function<RowData, String>
STREAMING_PROJECTED_ROW_TO_STRING =
+ rowData ->
+ (rowData.getRowKind() == RowKind.INSERT ? "+" : "-")
+ + BATCH_PROJECTED_ROW_TO_STRING.apply(rowData);
+
+ @Test
+ public void testOverwrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, 10, 100L));
+ write.write(GenericRowData.of(2, 20, 200L));
+ table.newCommit().commit("0", write.prepareCommit());
+ write.close();
+
+ write = table.newWrite().withOverwrite(true);
+ write.write(GenericRowData.of(2, 21, 201L));
+ Map<String, String> overwritePartition = new HashMap<>();
+ overwritePartition.put("pt", "2");
+ table.newCommit()
+ .withOverwritePartition(overwritePartition)
+ .commit("1", write.prepareCommit());
+ write.close();
+
+ List<Split> splits = table.newScan().plan().splits;
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("1|10|100"));
+ assertThat(getResult(read, splits, binaryRow(2), 0,
BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(Collections.singletonList("2|21|201"));
+ }
+
+ protected List<String> getResult(
+ TableRead read,
+ List<Split> splits,
+ BinaryRowData partition,
+ int bucket,
+ Function<RowData, String> rowDataToString)
+ throws Exception {
+ RecordReader<RowData> recordReader =
+ read.createReader(partition, bucket, getFilesFor(splits,
partition, bucket));
+ RecordReaderIterator<RowData> iterator = new
RecordReaderIterator<>(recordReader);
+ List<String> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ result.add(rowDataToString.apply(rowData));
+ }
+ iterator.close();
+ return result;
+ }
+
+ private List<DataFileMeta> getFilesFor(
+ List<Split> splits, BinaryRowData partition, int bucket) {
+ List<DataFileMeta> result = new ArrayList<>();
+ for (Split split : splits) {
+ if (split.partition().equals(partition) && split.bucket() ==
bucket) {
+ result.addAll(split.files());
+ }
+ }
+ return result;
+ }
+
+ protected BinaryRowData binaryRow(int a) {
+ BinaryRowData b = new BinaryRowData(1);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, a);
+ writer.complete();
+ return b;
+ }
+
+ protected abstract FileStoreTable createFileStoreTable() throws Exception;
+}
diff --git
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 25f9dcda..8653682f 100644
---
a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++
b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -50,7 +50,7 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws
IOException {
FileStoreTable table = createFileStoreTable(jobConf);
- TableScan scan = table.newScan(false);
+ TableScan scan = table.newScan();
createPredicate(jobConf).ifPresent(scan::withFilter);
return scan.plan().splits.stream()
.map(TableStoreInputSplit::create)
@@ -64,7 +64,7 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
long splitLength = split.getLength();
return new TableStoreRecordReader(
- table.newRead(false).createReader(split.partition(),
split.bucket(), split.files()),
+ table.newRead().createReader(split.partition(),
split.bucket(), split.files()),
splitLength);
}
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
deleted file mode 100644
index 3d79d694..00000000
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.types.logical.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-
-/** Helper class to write and read {@link RowData} with {@link FileStoreImpl}.
*/
-public class FileStoreTestHelper {
-
- private final FileStoreTable table;
- private final FileStore store;
- private final BiFunction<RowData, RowData, BinaryRowData>
partitionCalculator;
- private final Function<RowData, Integer> bucketCalculator;
- private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
- private final ExecutorService compactExecutor;
-
- public FileStoreTestHelper(
- Configuration conf,
- RowType rowType,
- List<String> partitionKeys,
- List<String> primaryKeys,
- BiFunction<RowData, RowData, BinaryRowData> partitionCalculator,
- Function<RowData, Integer> bucketCalculator)
- throws Exception {
- Path tablePath = FileStoreOptions.path(conf);
- new SchemaManager(tablePath)
- .commitNewVersion(
- new UpdateSchema(rowType, partitionKeys, primaryKeys,
conf.toMap(), ""));
-
- // only path, other config should be read from file store.
- conf = new Configuration();
- conf.set(PATH, tablePath.toString());
-
- this.table = FileStoreTableFactory.create(conf, "user");
- this.store = table.fileStore();
- this.partitionCalculator = partitionCalculator;
- this.bucketCalculator = bucketCalculator;
- this.writers = new HashMap<>();
- this.compactExecutor = Executors.newSingleThreadExecutor();
- }
-
- public void write(ValueKind kind, RowData key, RowData value) throws
Exception {
- BinaryRowData partition = partitionCalculator.apply(key, value);
- int bucket = bucketCalculator.apply(key);
- RecordWriter writer =
- writers.compute(partition, (p, m) -> m == null ? new
HashMap<>() : m)
- .compute(
- bucket,
- (b, w) -> {
- if (w == null) {
- FileStoreWrite write =
store.newWrite();
- return write.createWriter(
- partition, bucket,
compactExecutor);
- } else {
- return w;
- }
- });
- writer.write(kind, key, value);
- }
-
- public void commit() throws Exception {
- ManifestCommittable committable = new
ManifestCommittable(UUID.randomUUID().toString());
- for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>
entryWithPartition :
- writers.entrySet()) {
- for (Map.Entry<Integer, RecordWriter> entryWithBucket :
- entryWithPartition.getValue().entrySet()) {
- RecordWriter writer = entryWithBucket.getValue();
- writer.sync();
- Increment increment = writer.prepareCommit();
- committable.addFileCommittable(
- entryWithPartition.getKey(), entryWithBucket.getKey(),
increment);
- writer.close();
- }
- }
- writers.clear();
- FileStoreCommit commit = store.newCommit();
- commit.commit(committable, Collections.emptyMap());
- }
-
- public Tuple2<RecordReader<RowData>, Long> read(BinaryRowData partition,
int bucket)
- throws Exception {
- for (Split split : table.newScan(false).plan().splits) {
- if (split.partition().equals(partition) && split.bucket() ==
bucket) {
- return Tuple2.of(
- table.newRead(false).createReader(partition, bucket,
split.files()),
-
split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
- }
- }
- throw new IllegalArgumentException(
- "Input split not found for partition " + partition + " and
bucket " + bucket);
- }
-}
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
new file mode 100644
index 00000000..10721be9
--- /dev/null
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+
+/** Test utils related to {@link org.apache.flink.table.store.file.FileStore}.
*/
+public class FileStoreTestUtils {
+
+ public static FileStoreTable createFileStoreTable(
+ Configuration conf,
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys)
+ throws Exception {
+ Path tablePath = FileStoreOptions.path(conf);
+ new SchemaManager(tablePath)
+ .commitNewVersion(
+ new UpdateSchema(rowType, partitionKeys, primaryKeys,
conf.toMap(), ""));
+
+ // only path, other config should be read from file store.
+ conf = new Configuration();
+ conf.set(PATH, tablePath.toString());
+ return FileStoreTableFactory.create(conf, "user");
+ }
+}
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 2ff8d7a3..d0a87ccf 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -24,12 +24,13 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.FileStoreTestHelper;
+import org.apache.flink.table.store.FileStoreTestUtils;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -85,8 +86,8 @@ public class TableStoreHiveStorageHandlerITCase {
conf.setString(FileStoreOptions.PATH, path);
conf.setInteger(FileStoreOptions.BUCKET, 2);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
new LogicalType[] {
@@ -96,35 +97,17 @@ public class TableStoreHiveStorageHandlerITCase {
},
new String[] {"a", "b", "c"}),
Collections.emptyList(),
- Arrays.asList("a", "b"),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> k.getInt(0) % 2);
+ Arrays.asList("a", "b"));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, 10L),
- GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, 20L),
- GenericRowData.of(1, 20L, StringData.fromString("Hello")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(2, 30L),
- GenericRowData.of(2, 30L, StringData.fromString("World")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, 10L),
- GenericRowData.of(1, 10L, StringData.fromString("Hi Again")));
- helper.write(
- ValueKind.DELETE,
- GenericRowData.of(2, 30L),
- GenericRowData.of(2, 30L, StringData.fromString("World")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(2, 40L),
- GenericRowData.of(2, 40L, StringData.fromString("Test")));
- helper.commit();
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+ write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
+ write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi
Again")));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L,
StringData.fromString("World")));
+ write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
+ table.newCommit().commit("0", write.prepareCommit());
+ write.close();
hiveShell.execute(
String.join(
@@ -136,13 +119,7 @@ public class TableStoreHiveStorageHandlerITCase {
" c STRING",
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'",
- "TBLPROPERTIES (",
- " 'table-store.catalog' = 'test_catalog',",
- " 'table-store.primary-keys' = 'a,b',",
- " 'table-store.bucket' = '2',",
- " 'table-store.file.format' = 'avro'",
- ")")));
+ "LOCATION '" + path + "'")));
List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM
test_table ORDER BY b");
List<String> expected = Arrays.asList("10\t1\tHi Again",
"20\t1\tHello", "40\t2\tTest");
Assert.assertEquals(expected, actual);
@@ -155,8 +132,8 @@ public class TableStoreHiveStorageHandlerITCase {
conf.setString(FileStoreOptions.PATH, path);
conf.setInteger(FileStoreOptions.BUCKET, 2);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
new LogicalType[] {
@@ -166,35 +143,17 @@ public class TableStoreHiveStorageHandlerITCase {
},
new String[] {"a", "b", "c"}),
Collections.emptyList(),
- Collections.emptyList(),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> k.getInt(0) % 2);
+ Collections.emptyList());
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, 10L, StringData.fromString("Hi")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, 20L, StringData.fromString("Hello")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(2, 30L, StringData.fromString("World")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, 10L, StringData.fromString("Hi")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.DELETE,
- GenericRowData.of(2, 30L, StringData.fromString("World")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(2, 40L, StringData.fromString("Test")),
- GenericRowData.of(1L));
- helper.commit();
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+ write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
+ write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L,
StringData.fromString("World")));
+ write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
+ table.newCommit().commit("0", write.prepareCommit());
+ write.close();
hiveShell.execute(
String.join(
@@ -206,12 +165,7 @@ public class TableStoreHiveStorageHandlerITCase {
" c STRING",
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'",
- "TBLPROPERTIES (",
- " 'table-store.catalog' = 'test_catalog',",
- " 'table-store.bucket' = '2',",
- " 'table-store.file.format' = 'avro'",
- ")")));
+ "LOCATION '" + path + "'")));
List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM
test_table ORDER BY b");
List<String> expected =
Arrays.asList("10\t1\tHi", "10\t1\tHi", "20\t1\tHello",
"40\t2\tTest");
@@ -224,8 +178,8 @@ public class TableStoreHiveStorageHandlerITCase {
Configuration conf = new Configuration();
conf.setString(FileStoreOptions.PATH, root);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
RandomGenericRowDataGenerator.TYPE_INFOS.stream()
@@ -236,9 +190,7 @@ public class TableStoreHiveStorageHandlerITCase {
.toArray(LogicalType[]::new),
RandomGenericRowDataGenerator.FIELD_NAMES.toArray(new String[0])),
Collections.emptyList(),
- Collections.singletonList("f_int"),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> 0);
+ Collections.singletonList("f_int"));
ThreadLocalRandom random = ThreadLocalRandom.current();
List<GenericRowData> input = new ArrayList<>();
@@ -252,10 +204,13 @@ public class TableStoreHiveStorageHandlerITCase {
}
}
}
+
+ TableWrite write = table.newWrite();
for (GenericRowData rowData : input) {
- helper.write(ValueKind.ADD, GenericRowData.of(rowData.getInt(3)),
rowData);
+ write.write(rowData);
}
- helper.commit();
+ table.newCommit().commit("0", write.prepareCommit());
+ write.close();
StringBuilder ddl = new StringBuilder();
for (int i = 0; i < RandomGenericRowDataGenerator.FIELD_NAMES.size();
i++) {
@@ -365,31 +320,31 @@ public class TableStoreHiveStorageHandlerITCase {
Configuration conf = new Configuration();
conf.setString(FileStoreOptions.PATH, path);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
new LogicalType[]
{DataTypes.INT().getLogicalType()},
new String[] {"a"}),
Collections.emptyList(),
- Collections.emptyList(),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> 0);
+ Collections.emptyList());
// TODO add NaN related tests after FLINK-27627 and FLINK-27628 are
fixed
- helper.write(ValueKind.ADD, GenericRowData.of(1),
GenericRowData.of(1L));
- helper.commit();
- helper.write(ValueKind.ADD, GenericRowData.of((Object) null),
GenericRowData.of(1L));
- helper.commit();
- helper.write(ValueKind.ADD, GenericRowData.of(2),
GenericRowData.of(1L));
- helper.write(ValueKind.ADD, GenericRowData.of(3),
GenericRowData.of(1L));
- helper.write(ValueKind.ADD, GenericRowData.of((Object) null),
GenericRowData.of(1L));
- helper.commit();
- helper.write(ValueKind.ADD, GenericRowData.of(4),
GenericRowData.of(1L));
- helper.write(ValueKind.ADD, GenericRowData.of(5),
GenericRowData.of(1L));
- helper.write(ValueKind.ADD, GenericRowData.of(6),
GenericRowData.of(1L));
- helper.commit();
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1));
+ table.newCommit().commit("0", write.prepareCommit());
+ write.write(GenericRowData.of((Object) null));
+ table.newCommit().commit("1", write.prepareCommit());
+ write.write(GenericRowData.of(2));
+ write.write(GenericRowData.of(3));
+ write.write(GenericRowData.of((Object) null));
+ table.newCommit().commit("2", write.prepareCommit());
+ write.write(GenericRowData.of(4));
+ write.write(GenericRowData.of(5));
+ write.write(GenericRowData.of(6));
+ table.newCommit().commit("3", write.prepareCommit());
+ write.close();
hiveShell.execute(
String.join(
@@ -454,8 +409,8 @@ public class TableStoreHiveStorageHandlerITCase {
Configuration conf = new Configuration();
conf.setString(FileStoreOptions.PATH, path);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
new LogicalType[] {
@@ -464,30 +419,24 @@ public class TableStoreHiveStorageHandlerITCase {
},
new String[] {"dt", "ts"}),
Collections.emptyList(),
- Collections.emptyList(),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> 0);
+ Collections.emptyList());
- helper.write(
- ValueKind.ADD,
+ TableWrite write = table.newWrite();
+ write.write(
GenericRowData.of(
375, /* 1971-01-11 */
- TimestampData.fromLocalDateTime(LocalDateTime.of(2022,
5, 17, 17, 29, 20))),
- GenericRowData.of(1L));
- helper.commit();
- helper.write(ValueKind.ADD, GenericRowData.of(null, null),
GenericRowData.of(1L));
- helper.commit();
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(376 /* 1971-01-12 */, null),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
+ table.newCommit().commit("0", write.prepareCommit());
+ write.write(GenericRowData.of(null, null));
+ table.newCommit().commit("1", write.prepareCommit());
+ write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
+ write.write(
GenericRowData.of(
null,
- TimestampData.fromLocalDateTime(LocalDateTime.of(2022,
6, 18, 8, 30, 0))),
- GenericRowData.of(1L));
- helper.commit();
+ TimestampData.fromLocalDateTime(LocalDateTime.of(2022,
6, 18, 8, 30, 0))));
+ table.newCommit().commit("2", write.prepareCommit());
+ write.close();
hiveShell.execute(
String.join(
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 0edb2548..064a239e 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -24,14 +24,19 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.FileStoreTestHelper;
+import org.apache.flink.table.store.FileStoreTestUtils;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -54,8 +59,8 @@ public class TableStoreRecordReaderTest {
Configuration conf = new Configuration();
conf.setString(FileStoreOptions.PATH, tempDir.toString());
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
new LogicalType[] {
@@ -64,33 +69,17 @@ public class TableStoreRecordReaderTest {
},
new String[] {"a", "b"}),
Collections.emptyList(),
- Collections.singletonList("a"),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> 0);
-
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1L),
- GenericRowData.of(1L, StringData.fromString("Hi")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(2L),
- GenericRowData.of(2L, StringData.fromString("Hello")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(3L),
- GenericRowData.of(3L, StringData.fromString("World")));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1L),
- GenericRowData.of(1L, StringData.fromString("Hi again")));
- helper.write(
- ValueKind.DELETE,
- GenericRowData.of(2L),
- GenericRowData.of(2L, StringData.fromString("Hello")));
- helper.commit();
-
- Tuple2<RecordReader<RowData>, Long> tuple =
helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+ Collections.singletonList("a"));
+
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1L, StringData.fromString("Hi")));
+ write.write(GenericRowData.of(2L, StringData.fromString("Hello")));
+ write.write(GenericRowData.of(3L, StringData.fromString("World")));
+ write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2L,
StringData.fromString("Hello")));
+ table.newCommit().commit("0", write.prepareCommit());
+
+ Tuple2<RecordReader<RowData>, Long> tuple = read(table,
BinaryRowDataUtil.EMPTY_ROW, 0);
TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0,
tuple.f1);
RowDataContainer container = reader.createValue();
Set<String> actual = new HashSet<>();
@@ -111,8 +100,8 @@ public class TableStoreRecordReaderTest {
Configuration conf = new Configuration();
conf.setString(FileStoreOptions.PATH, tempDir.toString());
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
- FileStoreTestHelper helper =
- new FileStoreTestHelper(
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
new LogicalType[] {
@@ -121,37 +110,18 @@ public class TableStoreRecordReaderTest {
},
new String[] {"a", "b"}),
Collections.emptyList(),
- Collections.emptyList(),
- (k, v) -> BinaryRowDataUtil.EMPTY_ROW,
- k -> 0);
-
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, StringData.fromString("Hi")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(2, StringData.fromString("Hello")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(3, StringData.fromString("World")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, StringData.fromString("Hi")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.DELETE,
- GenericRowData.of(2, StringData.fromString("Hello")),
- GenericRowData.of(1L));
- helper.write(
- ValueKind.ADD,
- GenericRowData.of(1, StringData.fromString("Hi")),
- GenericRowData.of(1L));
- helper.commit();
-
- Tuple2<RecordReader<RowData>, Long> tuple =
helper.read(BinaryRowDataUtil.EMPTY_ROW, 0);
+ Collections.emptyList());
+
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, StringData.fromString("Hi")));
+ write.write(GenericRowData.of(2, StringData.fromString("Hello")));
+ write.write(GenericRowData.of(3, StringData.fromString("World")));
+ write.write(GenericRowData.of(1, StringData.fromString("Hi")));
+ write.write(GenericRowData.ofKind(RowKind.DELETE, 2,
StringData.fromString("Hello")));
+ write.write(GenericRowData.of(1, StringData.fromString("Hi")));
+ table.newCommit().commit("0", write.prepareCommit());
+
+ Tuple2<RecordReader<RowData>, Long> tuple = read(table,
BinaryRowDataUtil.EMPTY_ROW, 0);
TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0,
tuple.f1);
RowDataContainer container = reader.createValue();
Map<String, Integer> actual = new HashMap<>();
@@ -166,4 +136,17 @@ public class TableStoreRecordReaderTest {
expected.put("3|World", 1);
assertThat(actual).isEqualTo(expected);
}
+
+ private Tuple2<RecordReader<RowData>, Long> read(
+ FileStoreTable table, BinaryRowData partition, int bucket) throws
Exception {
+ for (Split split : table.newScan().plan().splits) {
+ if (split.partition().equals(partition) && split.bucket() ==
bucket) {
+ return Tuple2.of(
+ table.newRead().createReader(partition, bucket,
split.files()),
+
split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
+ }
+ }
+ throw new IllegalArgumentException(
+ "Input split not found for partition " + partition + " and
bucket " + bucket);
+ }
}
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index 55fab14e..ccc42576 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -22,7 +22,7 @@ import
org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
import org.apache.kafka.clients.producer.ProducerRecord;
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index e084bb9d..1f1b79b8 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
import org.apache.flink.table.store.log.LogOptions.LogConsistency;
import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index 1a48f771..37e7aeea 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -22,7 +22,7 @@ import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 40540093..4e920f04 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -36,7 +36,7 @@ import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.log.LogStoreTableFactory;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
index e06d527b..fa4dbdd1 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
@@ -27,7 +27,7 @@ import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.Precommitting
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.store.log.LogInitContext;
import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.SinkRecord;
import java.io.IOException;
import java.util.Collection;