This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 2765184 [hotfix] Let FileStore assign key/value/partition type
directly to StoreSink to avoid serialization error caused by incorrect
projection
2765184 is described below
commit 27651841123e0bf82a376788657d28e94e260127
Author: Jane Chan <[email protected]>
AuthorDate: Thu Mar 3 22:02:13 2022 +0800
[hotfix] Let FileStore assign key/value/partition type directly to
StoreSink to avoid serialization error caused by incorrect projection
This closes #35
---
.../table/store/connector/sink/StoreSink.java | 17 +++----
.../store/connector/source/FileStoreSource.java | 24 +++-------
.../table/store/connector/FileStoreITCase.java | 23 +++------
.../table/store/connector/sink/StoreSinkTest.java | 54 +++++++++++++---------
.../table/store/connector/sink/TestFileStore.java | 24 +++++++++-
.../apache/flink/table/store/file/FileStore.java | 7 +++
.../flink/table/store/file/FileStoreImpl.java | 15 ++++++
7 files changed, 98 insertions(+), 66 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index fdd3cbe..9ae59fd 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -30,7 +30,6 @@ import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.sink.SinkRecordConverter;
-import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
@@ -39,8 +38,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
-import static org.apache.flink.table.store.utils.ProjectionUtils.project;
-
/** {@link Sink} of dynamic store. */
public class StoreSink<WriterStateT, LogCommT>
implements StatefulSink<RowData, WriterStateT>,
@@ -52,8 +49,6 @@ public class StoreSink<WriterStateT, LogCommT>
private final FileStore fileStore;
- private final RowType rowType;
-
private final int[] partitions;
private final int[] primaryKeys;
@@ -67,7 +62,6 @@ public class StoreSink<WriterStateT, LogCommT>
public StoreSink(
ObjectIdentifier tableIdentifier,
FileStore fileStore,
- RowType rowType,
int[] partitions,
int[] primaryKeys,
int numBucket,
@@ -75,7 +69,6 @@ public class StoreSink<WriterStateT, LogCommT>
@Nullable Map<String, String> overwritePartition) {
this.tableIdentifier = tableIdentifier;
this.fileStore = fileStore;
- this.rowType = rowType;
this.partitions = partitions;
this.primaryKeys = primaryKeys;
this.numBucket = numBucket;
@@ -93,7 +86,11 @@ public class StoreSink<WriterStateT, LogCommT>
InitContext initContext, Collection<WriterStateT> states) throws
IOException {
return new StoreSinkWriter<>(
fileStore.newWrite(),
- new SinkRecordConverter(numBucket, rowType, partitions,
primaryKeys),
+ new SinkRecordConverter(
+ numBucket,
+ primaryKeys.length > 0 ? fileStore.valueType() :
fileStore.keyType(),
+ partitions,
+ primaryKeys),
fileCommitSerializer(),
overwritePartition != null);
}
@@ -136,14 +133,14 @@ public class StoreSink<WriterStateT, LogCommT>
public GlobalCommittableSerializer<LogCommT>
getGlobalCommittableSerializer() {
ManifestCommittableSerializer fileCommSerializer =
new ManifestCommittableSerializer(
- project(rowType, partitions), project(rowType,
primaryKeys), rowType);
+ fileStore.partitionType(), fileStore.keyType(),
fileStore.valueType());
SimpleVersionedSerializer<LogCommT> logCommitSerializer = new
NoOutputSerializer<>();
return new GlobalCommittableSerializer<>(logCommitSerializer,
fileCommSerializer);
}
private FileCommittableSerializer fileCommitSerializer() {
return new FileCommittableSerializer(
- project(rowType, partitions), project(rowType, primaryKeys),
rowType);
+ fileStore.partitionType(), fileStore.keyType(),
fileStore.valueType());
}
private static class NoOutputSerializer<T> implements
SimpleVersionedSerializer<T> {
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 062a49f..3997d37 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -29,11 +29,9 @@ import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
-import static org.apache.flink.table.store.utils.ProjectionUtils.project;
import static org.apache.flink.util.Preconditions.checkArgument;
/** {@link Source} of file store. */
@@ -44,11 +42,7 @@ public class FileStoreSource
private final FileStore fileStore;
- private final RowType rowType;
-
- private final int[] partitions;
-
- private final int[] primaryKeys;
+ private final boolean keyAsRecord;
@Nullable private final int[][] projectedFields;
@@ -58,16 +52,12 @@ public class FileStoreSource
public FileStoreSource(
FileStore fileStore,
- RowType rowType,
- int[] partitions,
- int[] primaryKeys,
+ boolean keyAsRecord,
@Nullable int[][] projectedFields,
@Nullable Predicate partitionPredicate,
@Nullable Predicate fieldsPredicate) {
this.fileStore = fileStore;
- this.rowType = rowType;
- this.partitions = partitions;
- this.primaryKeys = primaryKeys;
+ this.keyAsRecord = keyAsRecord;
this.projectedFields = projectedFields;
this.partitionPredicate = partitionPredicate;
this.fieldsPredicate = fieldsPredicate;
@@ -83,13 +73,13 @@ public class FileStoreSource
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
FileStoreRead read = fileStore.newRead();
if (projectedFields != null) {
- if (primaryKeys.length == 0) {
+ if (keyAsRecord) {
read.withKeyProjection(projectedFields);
} else {
read.withValueProjection(projectedFields);
}
}
- return new FileStoreSourceReader(context, read, primaryKeys.length ==
0);
+ return new FileStoreSourceReader(context, read, keyAsRecord);
}
@Override
@@ -100,7 +90,7 @@ public class FileStoreSource
scan.withPartitionFilter(partitionPredicate);
}
if (fieldsPredicate != null) {
- if (primaryKeys.length == 0) {
+ if (keyAsRecord) {
scan.withKeyFilter(fieldsPredicate);
} else {
scan.withValueFilter(fieldsPredicate);
@@ -120,7 +110,7 @@ public class FileStoreSource
@Override
public FileStoreSourceSplitSerializer getSplitSerializer() {
return new FileStoreSourceSplitSerializer(
- project(rowType, partitions), project(rowType, primaryKeys),
rowType);
+ fileStore.partitionType(), fileStore.keyType(),
fileStore.valueType());
}
@Override
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index f278086..28f2f8f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -150,7 +150,7 @@ public class FileStoreITCase extends AbstractTestBase {
write(finiteSource, fileStore, true, overwrite);
// read
- List<Row> results = read(env, fileStore, true);
+ List<Row> results = read(env, fileStore);
Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1),
Row.of(0, "p1", 2)};
assertThat(results).containsExactlyInAnyOrder(expected);
@@ -164,7 +164,7 @@ public class FileStoreITCase extends AbstractTestBase {
write(finiteSource, fileStore, true, new HashMap<>());
// read
- results = read(env, fileStore, true);
+ results = read(env, fileStore);
expected = new Row[] {Row.of(19, "p2", 6)};
assertThat(results).containsExactlyInAnyOrder(expected);
}
@@ -180,7 +180,7 @@ public class FileStoreITCase extends AbstractTestBase {
write(finiteSource, fileStore, partitioned);
// source
- List<Row> results = read(env, fileStore, partitioned);
+ List<Row> results = read(env, fileStore);
Row[] expected =
partitioned
@@ -255,26 +255,15 @@ public class FileStoreITCase extends AbstractTestBase {
int[] keys = new int[] {2};
StoreSink<?, ?> sink =
new StoreSink<>(
- null,
- fileStore,
- VALUE_TYPE,
- partitions,
- keys,
- NUM_BUCKET,
- null,
- overwritePartition);
+ null, fileStore, partitions, keys, NUM_BUCKET, null,
overwritePartition);
input = input.keyBy(row -> row.getInt(2)); // key by
GlobalCommittingSinkTranslator.translate(input, sink);
input.getExecutionEnvironment().execute();
}
- public static List<Row> read(
- StreamExecutionEnvironment env, FileStore fileStore, boolean
partitioned)
+ public static List<Row> read(StreamExecutionEnvironment env, FileStore
fileStore)
throws Exception {
- int[] partitions = partitioned ? new int[] {1} : new int[0];
- int[] keys = new int[] {2};
- FileStoreSource source =
- new FileStoreSource(fileStore, VALUE_TYPE, partitions, keys,
null, null, null);
+ FileStoreSource source = new FileStoreSource(fileStore, false, null,
null, null);
CloseableIterator<RowData> iterator =
env.fromSource(
source,
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 05ad999..fd3d97f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@@ -51,6 +52,8 @@ public class StoreSinkTest {
private final boolean hasPk;
+ private final boolean partitioned;
+
private final ObjectIdentifier identifier =
ObjectIdentifier.of("my_catalog", "my_database", "my_table");
@@ -59,24 +62,41 @@ public class StoreSinkTest {
private final RowType rowType = RowType.of(new IntType(), new IntType(),
new IntType());
private TestFileStore fileStore;
+ private int[] primaryKeys;
+ private int[] partitions;
- public StoreSinkTest(boolean hasPk) {
+ public StoreSinkTest(boolean hasPk, boolean partitioned) {
this.hasPk = hasPk;
+ this.partitioned = partitioned;
}
@Before
public void before() {
- fileStore = new TestFileStore(hasPk);
+ primaryKeys = hasPk ? new int[] {1} : new int[0];
+ partitions = partitioned ? new int[] {0} : new int[0];
+ RowType keyType = hasPk ? RowType.of(new IntType()) : rowType;
+ RowType valueType =
+ hasPk
+ ? rowType
+ : new RowType(
+ Collections.singletonList(
+ new RowType.RowField("COUNT", new
BigIntType(false))));
+ RowType partitionType = partitioned ? RowType.of(new IntType()) :
RowType.of();
+ fileStore = new TestFileStore(hasPk, keyType, valueType,
partitionType);
}
- @Parameterized.Parameters(name = "hasPk-{0}")
- public static List<Boolean> data() {
- return Arrays.asList(true, false);
+ @Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
+ public static List<Boolean[]> data() {
+ return Arrays.asList(
+ new Boolean[] {true, true},
+ new Boolean[] {true, false},
+ new Boolean[] {false, false},
+ new Boolean[] {false, true});
}
@Test
public void testChangelogs() throws Exception {
- Assume.assumeTrue(hasPk);
+ Assume.assumeTrue(hasPk && partitioned);
StoreSink<?, ?> sink = newSink(null);
writeAndCommit(
sink,
@@ -94,14 +114,13 @@ public class StoreSinkTest {
@Test
public void testNoKeyChangelogs() throws Exception {
- Assume.assumeTrue(!hasPk);
+ Assume.assumeTrue(!hasPk && partitioned);
StoreSink<?, ?> sink =
new StoreSink<>(
identifier,
fileStore,
- rowType,
- new int[] {0},
- new int[] {},
+ partitions,
+ primaryKeys,
2,
() -> lock,
new HashMap<>());
@@ -121,7 +140,7 @@ public class StoreSinkTest {
@Test
public void testAppend() throws Exception {
- Assume.assumeTrue(hasPk);
+ Assume.assumeTrue(hasPk && partitioned);
StoreSink<?, ?> sink = newSink(null);
writeAndAssert(sink);
@@ -134,7 +153,7 @@ public class StoreSinkTest {
@Test
public void testOverwrite() throws Exception {
- Assume.assumeTrue(hasPk);
+ Assume.assumeTrue(hasPk && partitioned);
StoreSink<?, ?> sink = newSink(new HashMap<>());
writeAndAssert(sink);
@@ -148,7 +167,7 @@ public class StoreSinkTest {
@Test
public void testOverwritePartition() throws Exception {
- Assume.assumeTrue(hasPk);
+ Assume.assumeTrue(hasPk && partitioned);
HashMap<String, String> partition = new HashMap<>();
partition.put("part", "0");
StoreSink<?, ?> sink = newSink(partition);
@@ -227,14 +246,7 @@ public class StoreSinkTest {
private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
return new StoreSink<>(
- identifier,
- fileStore,
- rowType,
- new int[] {0},
- new int[] {1},
- 2,
- () -> lock,
- overwritePartition);
+ identifier, fileStore, partitions, primaryKeys, 2, () -> lock,
overwritePartition);
}
private class TestLock implements CatalogLock {
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 2a05526..db7a3de 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -33,6 +33,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,11 +55,17 @@ public class TestFileStore implements FileStore {
public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles
= new HashMap<>();
public final boolean hasPk;
+ private final RowType keyType;
+ private final RowType valueType;
+ private final RowType partitionType;
public boolean expired = false;
- public TestFileStore(boolean hasPk) {
+ public TestFileStore(boolean hasPk, RowType keyType, RowType valueType,
RowType partitionType) {
this.hasPk = hasPk;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.partitionType = partitionType;
}
@Override
@@ -100,6 +107,21 @@ public class TestFileStore implements FileStore {
}
@Override
+ public RowType keyType() {
+ return keyType;
+ }
+
+ @Override
+ public RowType valueType() {
+ return valueType;
+ }
+
+ @Override
+ public RowType partitionType() {
+ return partitionType;
+ }
+
+ @Override
public FileStoreScan newScan() {
throw new UnsupportedOperationException();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index 532cc65..cc36435 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -23,6 +23,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreExpire;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
@@ -38,4 +39,10 @@ public interface FileStore extends Serializable {
FileStoreExpire newExpire();
FileStoreScan newScan();
+
+ RowType keyType();
+
+ RowType valueType();
+
+ RowType partitionType();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index d670ef6..2d96faf 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -151,4 +151,19 @@ public class FileStoreImpl implements FileStore {
return new FileStoreScanImpl(
partitionType, pathFactory(), manifestFileFactory(),
manifestListFactory());
}
+
+ @Override
+ public RowType keyType() {
+ return keyType;
+ }
+
+ @Override
+ public RowType valueType() {
+ return valueType;
+ }
+
+ @Override
+ public RowType partitionType() {
+ return partitionType;
+ }
}