This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c36c01  [FLINK-26860] FileStore continuous bug for delete row kind 
records
0c36c01 is described below

commit 0c36c014a6df25a95074a4b4630c495a1d3a488c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 25 16:00:26 2022 +0800

    [FLINK-26860] FileStore continuous bug for delete row kind records
    
    This closes #61
---
 .../source/FileStoreSourceSplitReader.java         | 15 ++++++++++---
 .../store/connector/ReadWriteTableITCase.java      | 25 ++--------------------
 .../table/store/file/utils/BlockingIterator.java   |  3 +--
 3 files changed, 15 insertions(+), 28 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 9361615..1ce705c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -188,7 +188,7 @@ public class FileStoreSourceSplitReader
 
     private abstract class FileStoreRecordIterator implements 
BulkFormat.RecordIterator<RowData> {
 
-        protected RecordReader.RecordIterator iterator;
+        private RecordReader.RecordIterator iterator;
 
         protected final MutableRecordAndPosition<RowData> recordAndPosition =
                 new MutableRecordAndPosition<>();
@@ -199,6 +199,15 @@ public class FileStoreSourceSplitReader
             return this;
         }
 
+        protected KeyValue nextKeyValue() throws IOException {
+            // The RowData is reused in iterator, we should set back to insert 
kind
+            if (recordAndPosition.getRecord() != null) {
+                recordAndPosition.getRecord().setRowKind(RowKind.INSERT);
+            }
+
+            return iterator.next();
+        }
+
         @Override
         public void releaseBatch() {
             this.iterator.releaseBatch();
@@ -212,7 +221,7 @@ public class FileStoreSourceSplitReader
         @Override
         public RecordAndPosition<RowData> next() {
             try {
-                KeyValue kv = iterator.next();
+                KeyValue kv = nextKeyValue();
                 if (kv == null) {
                     return null;
                 }
@@ -244,7 +253,7 @@ public class FileStoreSourceSplitReader
         public RecordAndPosition<RowData> next() {
             try {
                 if (count == 0) {
-                    KeyValue kv = iterator.next();
+                    KeyValue kv = nextKeyValue();
                     if (kv == null) {
                         return null;
                     }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 7b6d648..98c2682 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.types.Row;
 
 import org.junit.Ignore;
@@ -111,16 +110,6 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
         checkFileStorePath(tEnv, managedTable);
 
-        // test batch read with latest-scan
-        collectAndCheck(
-                        tEnv,
-                        managedTable,
-                        Collections.singletonMap(
-                                LogOptions.SCAN.key(),
-                                
LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
-                        Collections.emptyList())
-                .close();
-
         // overwrite dynamic partition
         prepareEnvAndOverwrite(
                 managedTable,
@@ -221,21 +210,10 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         assertNoMoreRecords(streamIter);
     }
 
-    @Ignore("file store continuous read is failed, actual has size 1")
     @Test
     public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() 
throws Exception {
         String managedTable = prepareEnvAndWrite(true, false, true, true);
 
-        // disable log store and read from latest
-        collectAndCheck(
-                        tEnv,
-                        managedTable,
-                        Collections.singletonMap(
-                                LogOptions.SCAN.key(),
-                                
LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
-                        Collections.emptyList())
-                .close();
-
         // input is dailyRatesChangelogWithoutUB()
         // file store continuous read
         // will not merge, at least collect two records
@@ -386,7 +364,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         List<Row> expectedRecords = Collections.emptyList();
         try {
             // set expectation size to 1 to let time pass by until timeout
-            expectedRecords = iterator.collect(1, 1L, TimeUnit.MINUTES);
+            // just wait 5s to avoid too long time
+            expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
             iterator.close();
         } catch (Exception ignored) {
             // don't throw exception
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
index 7e973aa..b59791b 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.utils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -88,7 +87,7 @@ public class BlockingIterator<IN, OUT> implements 
AutoCloseable {
 
     private List<OUT> doCollect(int limit) {
         if (limit == 0) {
-            return Collections.emptyList();
+            throw new RuntimeException("Collect zero record is meaningless.");
         }
 
         List<OUT> result = new ArrayList<>();

Reply via email to