This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 2765184  [hotfix] Let FileStore assign key/value/partition type 
directly to StoreSink to avoid serialization error caused by incorrect 
projection
2765184 is described below

commit 27651841123e0bf82a376788657d28e94e260127
Author: Jane Chan <[email protected]>
AuthorDate: Thu Mar 3 22:02:13 2022 +0800

    [hotfix] Let FileStore assign key/value/partition type directly to 
StoreSink to avoid serialization error caused by incorrect projection
    
    This closes #35
---
 .../table/store/connector/sink/StoreSink.java      | 17 +++----
 .../store/connector/source/FileStoreSource.java    | 24 +++-------
 .../table/store/connector/FileStoreITCase.java     | 23 +++------
 .../table/store/connector/sink/StoreSinkTest.java  | 54 +++++++++++++---------
 .../table/store/connector/sink/TestFileStore.java  | 24 +++++++++-
 .../apache/flink/table/store/file/FileStore.java   |  7 +++
 .../flink/table/store/file/FileStoreImpl.java      | 15 ++++++
 7 files changed, 98 insertions(+), 66 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index fdd3cbe..9ae59fd 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.sink.SinkRecordConverter;
-import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
@@ -39,8 +38,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
-import static org.apache.flink.table.store.utils.ProjectionUtils.project;
-
 /** {@link Sink} of dynamic store. */
 public class StoreSink<WriterStateT, LogCommT>
         implements StatefulSink<RowData, WriterStateT>,
@@ -52,8 +49,6 @@ public class StoreSink<WriterStateT, LogCommT>
 
     private final FileStore fileStore;
 
-    private final RowType rowType;
-
     private final int[] partitions;
 
     private final int[] primaryKeys;
@@ -67,7 +62,6 @@ public class StoreSink<WriterStateT, LogCommT>
     public StoreSink(
             ObjectIdentifier tableIdentifier,
             FileStore fileStore,
-            RowType rowType,
             int[] partitions,
             int[] primaryKeys,
             int numBucket,
@@ -75,7 +69,6 @@ public class StoreSink<WriterStateT, LogCommT>
             @Nullable Map<String, String> overwritePartition) {
         this.tableIdentifier = tableIdentifier;
         this.fileStore = fileStore;
-        this.rowType = rowType;
         this.partitions = partitions;
         this.primaryKeys = primaryKeys;
         this.numBucket = numBucket;
@@ -93,7 +86,11 @@ public class StoreSink<WriterStateT, LogCommT>
             InitContext initContext, Collection<WriterStateT> states) throws 
IOException {
         return new StoreSinkWriter<>(
                 fileStore.newWrite(),
-                new SinkRecordConverter(numBucket, rowType, partitions, 
primaryKeys),
+                new SinkRecordConverter(
+                        numBucket,
+                        primaryKeys.length > 0 ? fileStore.valueType() : 
fileStore.keyType(),
+                        partitions,
+                        primaryKeys),
                 fileCommitSerializer(),
                 overwritePartition != null);
     }
@@ -136,14 +133,14 @@ public class StoreSink<WriterStateT, LogCommT>
     public GlobalCommittableSerializer<LogCommT> 
getGlobalCommittableSerializer() {
         ManifestCommittableSerializer fileCommSerializer =
                 new ManifestCommittableSerializer(
-                        project(rowType, partitions), project(rowType, 
primaryKeys), rowType);
+                        fileStore.partitionType(), fileStore.keyType(), 
fileStore.valueType());
         SimpleVersionedSerializer<LogCommT> logCommitSerializer = new 
NoOutputSerializer<>();
         return new GlobalCommittableSerializer<>(logCommitSerializer, 
fileCommSerializer);
     }
 
     private FileCommittableSerializer fileCommitSerializer() {
         return new FileCommittableSerializer(
-                project(rowType, partitions), project(rowType, primaryKeys), 
rowType);
+                fileStore.partitionType(), fileStore.keyType(), 
fileStore.valueType());
     }
 
     private static class NoOutputSerializer<T> implements 
SimpleVersionedSerializer<T> {
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 062a49f..3997d37 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -29,11 +29,9 @@ import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import static org.apache.flink.table.store.utils.ProjectionUtils.project;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** {@link Source} of file store. */
@@ -44,11 +42,7 @@ public class FileStoreSource
 
     private final FileStore fileStore;
 
-    private final RowType rowType;
-
-    private final int[] partitions;
-
-    private final int[] primaryKeys;
+    private final boolean keyAsRecord;
 
     @Nullable private final int[][] projectedFields;
 
@@ -58,16 +52,12 @@ public class FileStoreSource
 
     public FileStoreSource(
             FileStore fileStore,
-            RowType rowType,
-            int[] partitions,
-            int[] primaryKeys,
+            boolean keyAsRecord,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
             @Nullable Predicate fieldsPredicate) {
         this.fileStore = fileStore;
-        this.rowType = rowType;
-        this.partitions = partitions;
-        this.primaryKeys = primaryKeys;
+        this.keyAsRecord = keyAsRecord;
         this.projectedFields = projectedFields;
         this.partitionPredicate = partitionPredicate;
         this.fieldsPredicate = fieldsPredicate;
@@ -83,13 +73,13 @@ public class FileStoreSource
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
         FileStoreRead read = fileStore.newRead();
         if (projectedFields != null) {
-            if (primaryKeys.length == 0) {
+            if (keyAsRecord) {
                 read.withKeyProjection(projectedFields);
             } else {
                 read.withValueProjection(projectedFields);
             }
         }
-        return new FileStoreSourceReader(context, read, primaryKeys.length == 
0);
+        return new FileStoreSourceReader(context, read, keyAsRecord);
     }
 
     @Override
@@ -100,7 +90,7 @@ public class FileStoreSource
             scan.withPartitionFilter(partitionPredicate);
         }
         if (fieldsPredicate != null) {
-            if (primaryKeys.length == 0) {
+            if (keyAsRecord) {
                 scan.withKeyFilter(fieldsPredicate);
             } else {
                 scan.withValueFilter(fieldsPredicate);
@@ -120,7 +110,7 @@ public class FileStoreSource
     @Override
     public FileStoreSourceSplitSerializer getSplitSerializer() {
         return new FileStoreSourceSplitSerializer(
-                project(rowType, partitions), project(rowType, primaryKeys), 
rowType);
+                fileStore.partitionType(), fileStore.keyType(), 
fileStore.valueType());
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index f278086..28f2f8f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -150,7 +150,7 @@ public class FileStoreITCase extends AbstractTestBase {
         write(finiteSource, fileStore, true, overwrite);
 
         // read
-        List<Row> results = read(env, fileStore, true);
+        List<Row> results = read(env, fileStore);
 
         Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), 
Row.of(0, "p1", 2)};
         assertThat(results).containsExactlyInAnyOrder(expected);
@@ -164,7 +164,7 @@ public class FileStoreITCase extends AbstractTestBase {
         write(finiteSource, fileStore, true, new HashMap<>());
 
         // read
-        results = read(env, fileStore, true);
+        results = read(env, fileStore);
         expected = new Row[] {Row.of(19, "p2", 6)};
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
@@ -180,7 +180,7 @@ public class FileStoreITCase extends AbstractTestBase {
         write(finiteSource, fileStore, partitioned);
 
         // source
-        List<Row> results = read(env, fileStore, partitioned);
+        List<Row> results = read(env, fileStore);
 
         Row[] expected =
                 partitioned
@@ -255,26 +255,15 @@ public class FileStoreITCase extends AbstractTestBase {
         int[] keys = new int[] {2};
         StoreSink<?, ?> sink =
                 new StoreSink<>(
-                        null,
-                        fileStore,
-                        VALUE_TYPE,
-                        partitions,
-                        keys,
-                        NUM_BUCKET,
-                        null,
-                        overwritePartition);
+                        null, fileStore, partitions, keys, NUM_BUCKET, null, 
overwritePartition);
         input = input.keyBy(row -> row.getInt(2)); // key by
         GlobalCommittingSinkTranslator.translate(input, sink);
         input.getExecutionEnvironment().execute();
     }
 
-    public static List<Row> read(
-            StreamExecutionEnvironment env, FileStore fileStore, boolean 
partitioned)
+    public static List<Row> read(StreamExecutionEnvironment env, FileStore 
fileStore)
             throws Exception {
-        int[] partitions = partitioned ? new int[] {1} : new int[0];
-        int[] keys = new int[] {2};
-        FileStoreSource source =
-                new FileStoreSource(fileStore, VALUE_TYPE, partitions, keys, 
null, null, null);
+        FileStoreSource source = new FileStoreSource(fileStore, false, null, 
null, null);
         CloseableIterator<RowData> iterator =
                 env.fromSource(
                                 source,
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 05ad999..fd3d97f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import 
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
@@ -51,6 +52,8 @@ public class StoreSinkTest {
 
     private final boolean hasPk;
 
+    private final boolean partitioned;
+
     private final ObjectIdentifier identifier =
             ObjectIdentifier.of("my_catalog", "my_database", "my_table");
 
@@ -59,24 +62,41 @@ public class StoreSinkTest {
     private final RowType rowType = RowType.of(new IntType(), new IntType(), 
new IntType());
 
     private TestFileStore fileStore;
+    private int[] primaryKeys;
+    private int[] partitions;
 
-    public StoreSinkTest(boolean hasPk) {
+    public StoreSinkTest(boolean hasPk, boolean partitioned) {
         this.hasPk = hasPk;
+        this.partitioned = partitioned;
     }
 
     @Before
     public void before() {
-        fileStore = new TestFileStore(hasPk);
+        primaryKeys = hasPk ? new int[] {1} : new int[0];
+        partitions = partitioned ? new int[] {0} : new int[0];
+        RowType keyType = hasPk ? RowType.of(new IntType()) : rowType;
+        RowType valueType =
+                hasPk
+                        ? rowType
+                        : new RowType(
+                                Collections.singletonList(
+                                        new RowType.RowField("COUNT", new 
BigIntType(false))));
+        RowType partitionType = partitioned ? RowType.of(new IntType()) : 
RowType.of();
+        fileStore = new TestFileStore(hasPk, keyType, valueType, 
partitionType);
     }
 
-    @Parameterized.Parameters(name = "hasPk-{0}")
-    public static List<Boolean> data() {
-        return Arrays.asList(true, false);
+    @Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
+    public static List<Boolean[]> data() {
+        return Arrays.asList(
+                new Boolean[] {true, true},
+                new Boolean[] {true, false},
+                new Boolean[] {false, false},
+                new Boolean[] {false, true});
     }
 
     @Test
     public void testChangelogs() throws Exception {
-        Assume.assumeTrue(hasPk);
+        Assume.assumeTrue(hasPk && partitioned);
         StoreSink<?, ?> sink = newSink(null);
         writeAndCommit(
                 sink,
@@ -94,14 +114,13 @@ public class StoreSinkTest {
 
     @Test
     public void testNoKeyChangelogs() throws Exception {
-        Assume.assumeTrue(!hasPk);
+        Assume.assumeTrue(!hasPk && partitioned);
         StoreSink<?, ?> sink =
                 new StoreSink<>(
                         identifier,
                         fileStore,
-                        rowType,
-                        new int[] {0},
-                        new int[] {},
+                        partitions,
+                        primaryKeys,
                         2,
                         () -> lock,
                         new HashMap<>());
@@ -121,7 +140,7 @@ public class StoreSinkTest {
 
     @Test
     public void testAppend() throws Exception {
-        Assume.assumeTrue(hasPk);
+        Assume.assumeTrue(hasPk && partitioned);
         StoreSink<?, ?> sink = newSink(null);
         writeAndAssert(sink);
 
@@ -134,7 +153,7 @@ public class StoreSinkTest {
 
     @Test
     public void testOverwrite() throws Exception {
-        Assume.assumeTrue(hasPk);
+        Assume.assumeTrue(hasPk && partitioned);
         StoreSink<?, ?> sink = newSink(new HashMap<>());
         writeAndAssert(sink);
 
@@ -148,7 +167,7 @@ public class StoreSinkTest {
 
     @Test
     public void testOverwritePartition() throws Exception {
-        Assume.assumeTrue(hasPk);
+        Assume.assumeTrue(hasPk && partitioned);
         HashMap<String, String> partition = new HashMap<>();
         partition.put("part", "0");
         StoreSink<?, ?> sink = newSink(partition);
@@ -227,14 +246,7 @@ public class StoreSinkTest {
 
     private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
         return new StoreSink<>(
-                identifier,
-                fileStore,
-                rowType,
-                new int[] {0},
-                new int[] {1},
-                2,
-                () -> lock,
-                overwritePartition);
+                identifier, fileStore, partitions, primaryKeys, 2, () -> lock, 
overwritePartition);
     }
 
     private class TestLock implements CatalogLock {
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 2a05526..db7a3de 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,11 +55,17 @@ public class TestFileStore implements FileStore {
     public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles 
= new HashMap<>();
 
     public final boolean hasPk;
+    private final RowType keyType;
+    private final RowType valueType;
+    private final RowType partitionType;
 
     public boolean expired = false;
 
-    public TestFileStore(boolean hasPk) {
+    public TestFileStore(boolean hasPk, RowType keyType, RowType valueType, 
RowType partitionType) {
         this.hasPk = hasPk;
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.partitionType = partitionType;
     }
 
     @Override
@@ -100,6 +107,21 @@ public class TestFileStore implements FileStore {
     }
 
     @Override
+    public RowType keyType() {
+        return keyType;
+    }
+
+    @Override
+    public RowType valueType() {
+        return valueType;
+    }
+
+    @Override
+    public RowType partitionType() {
+        return partitionType;
+    }
+
+    @Override
     public FileStoreScan newScan() {
         throw new UnsupportedOperationException();
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index 532cc65..cc36435 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreExpire;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
 
@@ -38,4 +39,10 @@ public interface FileStore extends Serializable {
     FileStoreExpire newExpire();
 
     FileStoreScan newScan();
+
+    RowType keyType();
+
+    RowType valueType();
+
+    RowType partitionType();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index d670ef6..2d96faf 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -151,4 +151,19 @@ public class FileStoreImpl implements FileStore {
         return new FileStoreScanImpl(
                 partitionType, pathFactory(), manifestFileFactory(), 
manifestListFactory());
     }
+
+    @Override
+    public RowType keyType() {
+        return keyType;
+    }
+
+    @Override
+    public RowType valueType() {
+        return valueType;
+    }
+
+    @Override
+    public RowType partitionType() {
+        return partitionType;
+    }
 }

Reply via email to