This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d1a8b6f  [FLINK-26879] Fix that LogStartupMode.LATEST and 
LogStartupMode.FROM_TIMESTAMP don't work
d1a8b6f is described below

commit d1a8b6fe2113654f3ba107969b3373ae459b4c5d
Author: Jane Chan <[email protected]>
AuthorDate: Wed Mar 30 14:43:42 2022 +0800

    [FLINK-26879] Fix that LogStartupMode.LATEST and 
LogStartupMode.FROM_TIMESTAMP don't work
    
    This closes #65
---
 flink-table-store-connector/pom.xml                |   8 +-
 .../flink/table/store/connector/TableStore.java    |  37 +-
 .../store/connector/source/TableStoreSource.java   |   1 -
 .../store/connector/ReadWriteTableITCase.java      | 427 +++++++++++++++++----
 flink-table-store-kafka/pom.xml                    |   2 +-
 .../table/store/kafka/KafkaLogSourceProvider.java  |  10 +-
 6 files changed, 380 insertions(+), 105 deletions(-)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index 536befb..dfde4ed 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -212,7 +212,7 @@ under the License.
             <!-- include 2.0 server for tests  -->
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_${scala.binary.version}</artifactId>
-            <version>2.4.1</version>
+            <version>2.8.1</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
@@ -239,6 +239,12 @@ under the License.
         </dependency>
 
         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>2.8.1</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>kafka</artifactId>
             <version>${testcontainers.version}</version>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 9374933..d222018 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -227,8 +227,6 @@ public class TableStore {
 
         private boolean isContinuous = false;
 
-        private boolean isHybrid = true;
-
         @Nullable private int[][] projectedFields;
 
         @Nullable private Predicate partitionPredicate;
@@ -257,11 +255,6 @@ public class TableStore {
             return this;
         }
 
-        public SourceBuilder withHybridMode(boolean isHybrid) {
-            this.isHybrid = isHybrid;
-            return this;
-        }
-
         public SourceBuilder withLogSourceProvider(LogSourceProvider 
logSourceProvider) {
             this.logSourceProvider = logSourceProvider;
             return this;
@@ -271,20 +264,14 @@ public class TableStore {
             return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
         }
 
-        private FileStoreSource buildFileSource(boolean isContinuous) {
-            FileStore fileStore = buildFileStore();
-
-            boolean latestContinuous = false;
-            if (isContinuous) {
-                LogStartupMode startupMode = logOptions().get(SCAN);
-                latestContinuous = startupMode == LogStartupMode.LATEST;
-            }
+        private FileStoreSource buildFileSource(
+                boolean isContinuous, boolean continuousScanLatest) {
             return new FileStoreSource(
-                    fileStore,
+                    buildFileStore(),
                     primaryKeys.length == 0,
                     isContinuous,
                     discoveryIntervalMills(),
-                    latestContinuous,
+                    continuousScanLatest,
                     projectedFields,
                     partitionPredicate,
                     fieldPredicate);
@@ -292,22 +279,22 @@ public class TableStore {
 
         public Source<RowData, ?, ?> build() {
             if (isContinuous) {
+                LogStartupMode startupMode = logOptions().get(SCAN);
                 if (logSourceProvider == null) {
-                    return buildFileSource(true);
-                }
-
-                if (isHybrid) {
+                    return buildFileSource(true, startupMode == 
LogStartupMode.LATEST);
+                } else {
+                    if (startupMode != LogStartupMode.FULL) {
+                        return logSourceProvider.createSource(null);
+                    }
                     return HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
-                                    buildFileSource(false))
+                                    buildFileSource(false, false))
                             .addSource(
                                     new 
LogHybridSourceFactory(logSourceProvider),
                                     Boundedness.CONTINUOUS_UNBOUNDED)
                             .build();
-                } else {
-                    return logSourceProvider.createSource(null);
                 }
             } else {
-                return buildFileSource(false);
+                return buildFileSource(false, false);
             }
         }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 11da93e..8de30f6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -135,7 +135,6 @@ public class TableStoreSource
                 tableStore
                         .sourceBuilder()
                         .withContinuousMode(streaming)
-                        .withHybridMode(streaming && logSourceProvider != null)
                         .withLogSourceProvider(logSourceProvider)
                         .withProjection(projectFields)
                         
.withPartitionPredicate(PredicateConverter.convert(partitionFilters))
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 86ec5dc..3458529 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
@@ -29,6 +30,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -58,8 +60,6 @@ public class ReadWriteTableITCase extends KafkaTableTestBase {
 
     @Test
     public void testBatchWriteWithPartitionedRecordsWithPk() throws Exception {
-        String managedTable = prepareEnvAndWrite(false, false, true, true);
-
         // input is dailyRates()
         List<Row> expectedRecords =
                 Arrays.asList(
@@ -70,7 +70,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase {
                         // part = 2022-01-02
                         changelogRow("+I", "Euro", 119L, "2022-01-02"));
         // test batch read
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        String managedTable = collectAndCheckBatchReadWrite(true, true, 
expectedRecords);
         checkFileStorePath(tEnv, managedTable);
 
         // test streaming read
@@ -91,22 +91,27 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         assertNoMoreRecords(streamIter);
 
         // batch read to check partition refresh
-        expectedRecords = new ArrayList<>(expectedRecords);
-        expectedRecords.remove(3);
-        expectedRecords.add(changelogRow("+I", "Euro", 100L, "2022-01-02"));
-        expectedRecords.add(changelogRow("+I", "Yen", 1L, "2022-01-02"));
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        collectAndCheck(
+                        tEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                // part = 2022-01-01
+                                changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
+                                changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                                changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                                // part = 2022-01-02
+                                changelogRow("+I", "Euro", 100L, "2022-01-02"),
+                                changelogRow("+I", "Yen", 1L, "2022-01-02")))
+                .close();
     }
 
     @Test
     public void testBatchWriteWithPartitionedRecordsWithoutPk() throws 
Exception {
-        String managedTable = prepareEnvAndWrite(false, false, true, false);
-
         // input is dailyRates()
-        List<Row> expectedRecords = dailyRates();
 
         // test batch read
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        String managedTable = collectAndCheckBatchReadWrite(true, false, 
dailyRates());
         checkFileStorePath(tEnv, managedTable);
 
         // overwrite dynamic partition
@@ -135,15 +140,15 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
 
     @Test
     public void testBatchWriteWithNonPartitionedRecordsWithPk() throws 
Exception {
-        String managedTable = prepareEnvAndWrite(false, false, false, true);
-
         // input is rates()
-        List<Row> expectedRecords =
-                Arrays.asList(
-                        changelogRow("+I", "US Dollar", 102L),
-                        changelogRow("+I", "Yen", 1L),
-                        changelogRow("+I", "Euro", 119L));
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        false,
+                        true,
+                        Arrays.asList(
+                                changelogRow("+I", "US Dollar", 102L),
+                                changelogRow("+I", "Yen", 1L),
+                                changelogRow("+I", "Euro", 119L)));
         checkFileStorePath(tEnv, managedTable);
 
         // overwrite the whole table
@@ -151,37 +156,35 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 managedTable,
                 Collections.emptyMap(),
                 Collections.singletonList(new String[] {"'Euro'", "100"}));
-        expectedRecords = new ArrayList<>(expectedRecords);
-        expectedRecords.clear();
-        expectedRecords.add(changelogRow("+I", "Euro", 100L));
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        collectAndCheck(
+                tEnv,
+                managedTable,
+                Collections.emptyMap(),
+                Collections.singletonList(changelogRow("+I", "Euro", 100L)));
     }
 
     @Test
     public void testBatchWriteNonPartitionedRecordsWithoutPk() throws 
Exception {
-        String managedTable = prepareEnvAndWrite(false, false, false, false);
-
         // input is rates()
-        List<Row> expectedRecords = rates();
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        String managedTable = collectAndCheckBatchReadWrite(false, false, 
rates());
         checkFileStorePath(tEnv, managedTable);
     }
 
     @Test
     public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() 
throws Exception {
-        String managedTable = prepareEnvAndWrite(true, true, true, true);
-
         // input is dailyRatesChangelogWithoutUB()
         // test hybrid read
-        List<Row> expectedRecords =
-                Arrays.asList(
-                        // part = 2022-01-01
-                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
-                        // part = 2022-01-02
-                        changelogRow("+I", "Euro", 119L, "2022-01-02"));
-        BlockingIterator<Row, Row> streamIter =
-                collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords);
+        Tuple2<String, BlockingIterator<Row, Row>> tuple =
+                collectAndCheckStreamingReadWriteWithoutClose(
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                // part = 2022-01-01
+                                changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
+                                // part = 2022-01-02
+                                changelogRow("+I", "Euro", 119L, 
"2022-01-02")));
+        String managedTable = tuple.f0;
         checkFileStorePath(tEnv, managedTable);
+        BlockingIterator<Row, Row> streamIter = tuple.f1;
 
         // overwrite partition 2022-01-02
         prepareEnvAndOverwrite(
@@ -210,64 +213,324 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
 
     @Test
     public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() 
throws Exception {
-        String managedTable = prepareEnvAndWrite(true, false, true, true);
-
         // input is dailyRatesChangelogWithoutUB()
         // file store continuous read
         // will not merge, at least collect two records
-        List<Row> expectedRecords =
+        checkFileStorePath(
+                tEnv,
+                collectAndCheckStreamingReadWriteWithClose(
+                        false,
+                        true,
+                        true,
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                // part = 2022-01-01
+                                changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
+                                // part = 2022-01-02
+                                changelogRow("+I", "Euro", 119L, 
"2022-01-02"))));
+    }
+
+    @Test
+    public void testStreamingReadWritePartitionedRecordsWithoutPk() throws 
Exception {
+        // input is dailyRatesChangelogWithUB()
+        // enable log store, file store bounded read with merge
+        checkFileStorePath(
+                tEnv,
+                collectAndCheckStreamingReadWriteWithClose(
+                        true,
+                        true,
+                        false,
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                // part = 2022-01-01
+                                changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
+                                // part = 2022-01-02
+                                changelogRow("+I", "Euro", 115L, 
"2022-01-02"))));
+    }
+
+    @Test
+    public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws 
Exception {
+        // input is ratesChangelogWithoutUB()
+        // enable log store, file store bounded read with merge
+        checkFileStorePath(
+                tEnv,
+                collectAndCheckStreamingReadWriteWithClose(
+                        true,
+                        false,
+                        true,
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                changelogRow("+I", "US Dollar", 102L),
+                                changelogRow("+I", "Euro", 119L))));
+    }
+
+    @Test
+    public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws 
Exception {
+        // input is ratesChangelogWithUB()
+        // enable log store, with default full scan mode, will merge
+        checkFileStorePath(
+                tEnv,
+                collectAndCheckStreamingReadWriteWithClose(
+                        true,
+                        false,
+                        false,
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                changelogRow("+I", "US Dollar", 102L),
+                                changelogRow("+I", "Euro", 119L))));
+    }
+
+    @Test
+    public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws 
Exception {
+        // input is dailyRatesChangelogWithoutUB()
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
                 Arrays.asList(
                         // part = 2022-01-01
                         changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-U", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+U", "Euro", 116L, "2022-01-01"),
+                        changelogRow("-D", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-D", "Euro", 116L, "2022-01-01"),
                         // part = 2022-01-02
-                        changelogRow("+I", "Euro", 119L, "2022-01-02"));
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+                        changelogRow("+I", "Euro", 119L, "2022-01-02")));
     }
 
     @Test
-    public void testStreamingReadWritePartitionedRecordsWithoutPk() throws 
Exception {
-        String managedTable = prepareEnvAndWrite(true, true, true, false);
-
+    public void testReadLatestChangelogOfPartitionedRecordsWithoutPk() throws 
Exception {
         // input is dailyRatesChangelogWithUB()
-        // enable log store, file store bounded read with merge
-        List<Row> expectedRecords =
+        collectLatestLogAndCheck(
+                false,
+                true,
+                false,
                 Arrays.asList(
                         // part = 2022-01-01
                         changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 116L, "2022-01-01"),
+                        changelogRow("-D", "Euro", 116L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-D", "Yen", 1L, "2022-01-01"),
                         // part = 2022-01-02
-                        changelogRow("+I", "Euro", 115L, "2022-01-02"));
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+                        changelogRow("+I", "Euro", 114L, "2022-01-02"),
+                        changelogRow("-D", "Euro", 114L, "2022-01-02"),
+                        changelogRow("+I", "Euro", 119L, "2022-01-02"),
+                        changelogRow("-D", "Euro", 119L, "2022-01-02"),
+                        changelogRow("+I", "Euro", 115L, "2022-01-02")));
     }
 
     @Test
-    public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws 
Exception {
-        String managedTable = prepareEnvAndWrite(true, true, false, true);
-
+    public void testReadLatestChangelogOfNonPartitionedRecordsWithPk() throws 
Exception {
         // input is ratesChangelogWithoutUB()
-        // enable log store, file store bounded read with merge
-        List<Row> expectedRecords =
+        collectLatestLogAndCheck(
+                false,
+                false,
+                true,
                 Arrays.asList(
-                        changelogRow("+I", "US Dollar", 102L), 
changelogRow("+I", "Euro", 119L));
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Yen", 1L),
+                        changelogRow("-U", "Euro", 114L),
+                        changelogRow("+U", "Euro", 116L),
+                        changelogRow("-D", "Euro", 116L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("-D", "Yen", 1L)));
     }
 
     @Test
-    public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws 
Exception {
-        String managedTable = prepareEnvAndWrite(true, true, false, false);
-
+    public void testReadLatestChangelogOfNonPartitionedRecordsWithoutPk() 
throws Exception {
         // input is ratesChangelogWithUB()
-        // enable log store, with default full scan mode, will merge
-        List<Row> expectedRecords =
+        collectLatestLogAndCheck(
+                false,
+                false,
+                false,
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Yen", 1L),
+                        changelogRow("-D", "Euro", 114L),
+                        changelogRow("+I", "Euro", 116L),
+                        changelogRow("-D", "Euro", 116L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("-D", "Euro", 119L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("-D", "Yen", 1L)));
+    }
+
+    @Test
+    public void testReadLatestChangelogOfInsertOnlyRecords() throws Exception {
+        // input is rates()
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Yen", 1L),
+                        changelogRow("-U", "Euro", 114L),
+                        changelogRow("+U", "Euro", 119L));
+
+        // currency as pk
+        collectLatestLogAndCheck(true, false, true, expected);
+
+        // without pk
+        collectLatestLogAndCheck(true, false, true, expected);
+    }
+
+    @Test
+    public void testReadInsertOnlyChangelogFromTimestamp() throws Exception {
+        // input is dailyRates()
+        collectChangelogFromTimestampAndCheck(
+                true,
+                true,
+                true,
+                0,
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 119L, "2022-01-02")));
+
+        collectChangelogFromTimestampAndCheck(true, true, false, 0, 
dailyRates());
+
+        // input is rates()
+        collectChangelogFromTimestampAndCheck(
+                true,
+                false,
+                true,
+                0,
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Yen", 1L),
+                        changelogRow("-U", "Euro", 114L),
+                        changelogRow("+U", "Euro", 119L)));
+
+        collectChangelogFromTimestampAndCheck(true, false, false, 0, rates());
+
+        collectChangelogFromTimestampAndCheck(
+                true, false, false, Long.MAX_VALUE - 1, 
Collections.emptyList());
+    }
+
+    @Test
+    public void testReadRetractChangelogFromTimestamp() throws Exception {
+        // input is dailyRatesChangelogWithUB()
+        collectChangelogFromTimestampAndCheck(
+                false,
+                true,
+                false,
+                0,
                 Arrays.asList(
-                        changelogRow("+I", "US Dollar", 102L), 
changelogRow("+I", "Euro", 119L));
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 116L, "2022-01-01"),
+                        changelogRow("-D", "Euro", 116L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-D", "Yen", 1L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 114L, "2022-01-02"),
+                        changelogRow("-D", "Euro", 114L, "2022-01-02"),
+                        changelogRow("+I", "Euro", 119L, "2022-01-02"),
+                        changelogRow("-D", "Euro", 119L, "2022-01-02"),
+                        changelogRow("+I", "Euro", 115L, "2022-01-02")));
 
-        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), 
expectedRecords).close();
+        // input is dailyRatesChangelogWithoutUB()
+        collectChangelogFromTimestampAndCheck(
+                false,
+                true,
+                true,
+                0,
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-U", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+U", "Euro", 116L, "2022-01-01"),
+                        changelogRow("-D", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-D", "Euro", 116L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 119L, "2022-01-02")));
     }
 
     // ------------------------ Tools ----------------------------------
 
-    private String prepareEnvAndWrite(
-            boolean streaming, boolean enableLogStore, boolean partitioned, 
boolean hasPk)
+    private String collectAndCheckBatchReadWrite(
+            boolean partitioned, boolean hasPk, List<Row> expected) throws 
Exception {
+        return collectAndCheckUnderSameEnv(
+                        false,
+                        false,
+                        true,
+                        partitioned,
+                        hasPk,
+                        true,
+                        Collections.emptyMap(),
+                        expected)
+                .f0;
+    }
+
+    private String collectAndCheckStreamingReadWriteWithClose(
+            boolean enableLogStore,
+            boolean partitioned,
+            boolean hasPk,
+            Map<String, String> readHints,
+            List<Row> expected)
+            throws Exception {
+        Tuple2<String, BlockingIterator<Row, Row>> tuple =
+                collectAndCheckUnderSameEnv(
+                        true, enableLogStore, false, partitioned, hasPk, true, 
readHints, expected);
+        tuple.f1.close();
+        return tuple.f0;
+    }
+
+    private Tuple2<String, BlockingIterator<Row, Row>>
+            collectAndCheckStreamingReadWriteWithoutClose(
+                    Map<String, String> readHints, List<Row> expected) throws 
Exception {
+        return collectAndCheckUnderSameEnv(
+                true, true, false, true, true, true, readHints, expected);
+    }
+
+    private void collectLatestLogAndCheck(
+            boolean insertOnly, boolean partitioned, boolean hasPk, List<Row> 
expected)
+            throws Exception {
+        Map<String, String> hints = new HashMap<>();
+        hints.put(
+                LOG_PREFIX + LogOptions.SCAN.key(),
+                LogOptions.LogStartupMode.LATEST.name().toLowerCase());
+        collectAndCheckUnderSameEnv(
+                        true, true, insertOnly, partitioned, hasPk, false, 
hints, expected)
+                .f1
+                .close();
+    }
+
+    private void collectChangelogFromTimestampAndCheck(
+            boolean insertOnly,
+            boolean partitioned,
+            boolean hasPk,
+            long timestamp,
+            List<Row> expected)
+            throws Exception {
+        Map<String, String> hints = new HashMap<>();
+        hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp");
+        hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(), 
String.valueOf(timestamp));
+        collectAndCheckUnderSameEnv(
+                        true, true, insertOnly, partitioned, hasPk, true, 
hints, expected)
+                .f1
+                .close();
+    }
+
+    private Tuple2<String, BlockingIterator<Row, Row>> 
collectAndCheckUnderSameEnv(
+            boolean streaming,
+            boolean enableLogStore,
+            boolean insertOnly,
+            boolean partitioned,
+            boolean hasPk,
+            boolean writeFirst,
+            Map<String, String> readHints,
+            List<Row> expected)
             throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
@@ -282,7 +545,11 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         String helperTableDdl;
         if (streaming) {
             helperTableDdl =
-                    prepareHelperSourceWithChangelogRecords(sourceTable, 
partitioned, hasPk);
+                    insertOnly
+                            ? prepareHelperSourceWithInsertOnlyRecords(
+                                    sourceTable, partitioned, hasPk)
+                            : prepareHelperSourceWithChangelogRecords(
+                                    sourceTable, partitioned, hasPk);
             env = buildStreamEnv();
             builder.inStreamingMode();
         } else {
@@ -292,13 +559,29 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
             builder.inBatchMode();
         }
         String managedTableDdl = prepareManagedTableDdl(sourceTable, 
managedTable, tableOptions);
-        String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
 
         tEnv = StreamTableEnvironment.create(env, builder.build());
         tEnv.executeSql(helperTableDdl);
         tEnv.executeSql(managedTableDdl);
-        tEnv.executeSql(insertQuery).await();
-        return managedTable;
+
+        String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
+        String selectQuery = prepareSimpleSelectQuery(managedTable, readHints);
+
+        BlockingIterator<Row, Row> iterator;
+        if (writeFirst) {
+            tEnv.executeSql(insertQuery).await();
+            iterator = 
BlockingIterator.of(tEnv.executeSql(selectQuery).collect());
+        } else {
+            iterator = 
BlockingIterator.of(tEnv.executeSql(selectQuery).collect());
+            tEnv.executeSql(insertQuery).await();
+        }
+        if (expected.isEmpty()) {
+            assertNoMoreRecords(iterator);
+        } else {
+            assertThat(iterator.collect(expected.size(), 10, TimeUnit.SECONDS))
+                    .containsExactlyInAnyOrderElementsOf(expected);
+        }
+        return Tuple2.of(managedTable, iterator);
     }
 
     private void prepareEnvAndOverwrite(
@@ -465,7 +748,7 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
 
     private static String buildHints(Map<String, String> hints) {
         if (hints.size() > 0) {
-            StringBuilder hintsBuilder = new StringBuilder("/* + OPTIONS (");
+            StringBuilder hintsBuilder = new StringBuilder("/*+ OPTIONS (");
             hints.forEach(
                     (k, v) -> {
                         hintsBuilder.append(String.format("'%s'", k));
@@ -473,7 +756,7 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         hintsBuilder.append(String.format("'%s', ", v));
                     });
             int len = hintsBuilder.length();
-            hintsBuilder.deleteCharAt(len - 1);
+            hintsBuilder.deleteCharAt(len - 2);
             hintsBuilder.append(") */");
             return hintsBuilder.toString();
         }
diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml
index 22787ae..04b8e8a 100644
--- a/flink-table-store-kafka/pom.xml
+++ b/flink-table-store-kafka/pom.xml
@@ -110,7 +110,7 @@ under the License.
             <!-- include 2.0 server for tests  -->
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_${scala.binary.version}</artifactId>
-            <version>2.4.1</version>
+            <version>2.8.1</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index d504802..60308a0 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -52,7 +52,7 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
 
     private final int[] primaryKey;
 
-    @Nullable private final DeserializationSchema<RowData> keyDeserializer;
+    @Nullable private final DeserializationSchema<RowData> 
primaryKeyDeserializer;
 
     private final DeserializationSchema<RowData> valueDeserializer;
 
@@ -69,7 +69,7 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
             Properties properties,
             DataType physicalType,
             int[] primaryKey,
-            @Nullable DeserializationSchema<RowData> keyDeserializer,
+            @Nullable DeserializationSchema<RowData> primaryKeyDeserializer,
             DeserializationSchema<RowData> valueDeserializer,
             @Nullable int[][] projectFields,
             LogConsistency consistency,
@@ -79,7 +79,7 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
         this.properties = properties;
         this.physicalType = physicalType;
         this.primaryKey = primaryKey;
-        this.keyDeserializer = keyDeserializer;
+        this.primaryKeyDeserializer = primaryKeyDeserializer;
         this.valueDeserializer = valueDeserializer;
         this.projectFields = projectFields;
         this.consistency = consistency;
@@ -95,7 +95,7 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
                 properties.setProperty(ISOLATION_LEVEL_CONFIG, 
"read_committed");
                 break;
             case EVENTUAL:
-                if (keyDeserializer == null) {
+                if (primaryKeyDeserializer == null) {
                     throw new IllegalArgumentException(
                             "Can not use EVENTUAL consistency mode for non-pk 
table.");
                 }
@@ -117,7 +117,7 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
                 new KafkaLogDeserializationSchema(
                         physicalType,
                         primaryKey,
-                        keyDeserializer,
+                        primaryKeyDeserializer,
                         valueDeserializer,
                         projectFields));
     }

Reply via email to