This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 22100a7  [FLINK-26857] Add ITCase for projection and filter predicate
22100a7 is described below

commit 22100a7493da6c41953f9020e84df2c1e6a6e94d
Author: Jane Chan <[email protected]>
AuthorDate: Fri Apr 1 18:34:20 2022 +0800

    [FLINK-26857] Add ITCase for projection and filter predicate
    
    This closes #69
---
 .../store/connector/source/TableStoreSource.java   |   4 +-
 .../store/connector/ReadWriteTableITCase.java      | 835 ++++++++++++++++++++-
 .../store/kafka/KafkaLogDeserializationSchema.java |  12 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |   6 +-
 4 files changed, 813 insertions(+), 44 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 8de30f6..e1681fe 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -44,7 +44,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -165,8 +164,7 @@ public class TableStoreSource
             fieldFilters = filters;
         }
         return Result.of(
-                new ArrayList<>(filters),
-                fieldFilters == null ? Collections.emptyList() : fieldFilters);
+                filters, streaming && logStoreTableFactory != null ? filters : 
fieldFilters);
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 3458529..6ea0367 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -35,6 +35,8 @@ import org.apache.flink.types.Row;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,13 +66,15 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         List<Row> expectedRecords =
                 Arrays.asList(
                         // part = 2022-01-01
-                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01"),
                         changelogRow("+I", "Yen", 1L, "2022-01-01"),
                         changelogRow("+I", "Euro", 114L, "2022-01-01"),
                         // part = 2022-01-02
                         changelogRow("+I", "Euro", 119L, "2022-01-02"));
         // test batch read
-        String managedTable = collectAndCheckBatchReadWrite(true, true, 
expectedRecords);
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        true, true, null, Collections.emptyList(), 
expectedRecords);
         checkFileStorePath(tEnv, managedTable);
 
         // test streaming read
@@ -97,13 +101,80 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         Collections.emptyMap(),
                         Arrays.asList(
                                 // part = 2022-01-01
-                                changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
+                                changelogRow("+I", "US Dollar", 114L, 
"2022-01-01"),
                                 changelogRow("+I", "Yen", 1L, "2022-01-01"),
                                 changelogRow("+I", "Euro", 114L, "2022-01-01"),
                                 // part = 2022-01-02
                                 changelogRow("+I", "Euro", 100L, "2022-01-02"),
                                 changelogRow("+I", "Yen", 1L, "2022-01-02")))
                 .close();
+
+        // test partition filter
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01"));
+        collectAndCheckBatchReadWrite(
+                true, true, "dt <> '2022-01-02'", Collections.emptyList(), 
expected);
+
+        collectAndCheckBatchReadWrite(
+                true, true, "dt = '2022-01-01'", Collections.emptyList(), 
expected);
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                true,
+                true,
+                "rate >= 100",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 119L, "2022-01-02")));
+
+        // test partition and field filter
+        collectAndCheckBatchReadWrite(
+                true,
+                true,
+                "rate >= 100 AND dt = '2022-01-02'",
+                Collections.emptyList(),
+                Collections.singletonList(changelogRow("+I", "Euro", 119L, 
"2022-01-02")));
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                true,
+                true,
+                null,
+                Collections.singletonList("dt"),
+                Arrays.asList(
+                        changelogRow("+I", "2022-01-01"), // US Dollar
+                        changelogRow("+I", "2022-01-01"), // Yen
+                        changelogRow("+I", "2022-01-01"), // Euro
+                        changelogRow("+I", "2022-01-02"))); // Euro
+
+        collectAndCheckBatchReadWrite(
+                true,
+                true,
+                null,
+                Collections.singletonList("dt, currency, rate"),
+                Arrays.asList(
+                        changelogRow("+I", "2022-01-01", "US Dollar", 114L), 
// US Dollar
+                        changelogRow("+I", "2022-01-01", "Yen", 1L), // Yen
+                        changelogRow("+I", "2022-01-01", "Euro", 114L), // Euro
+                        changelogRow("+I", "2022-01-02", "Euro", 119L))); // 
Euro
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                true,
+                true,
+                "rate = 114",
+                Collections.singletonList("rate"),
+                Arrays.asList(
+                        changelogRow("+I", 114L), // US Dollar
+                        changelogRow("+I", 114L) // Euro
+                        ));
     }
 
     @Test
@@ -111,7 +182,9 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         // input is dailyRates()
 
         // test batch read
-        String managedTable = collectAndCheckBatchReadWrite(true, false, 
dailyRates());
+        String managedTable =
+                collectAndCheckBatchReadWrite(
+                        true, false, null, Collections.emptyList(), 
dailyRates());
         checkFileStorePath(tEnv, managedTable);
 
         // overwrite dynamic partition
@@ -136,6 +209,52 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                                 // part = 2022-01-02
                                 changelogRow("+I", "Yen", 2L, "2022-01-02")))
                 .close();
+
+        // test partition filter
+        collectAndCheckBatchReadWrite(
+                true, false, "dt >= '2022-01-01'", Collections.emptyList(), 
dailyRates());
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                true,
+                false,
+                "currency = 'US Dollar'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 114L, "2022-01-01")));
+
+        // test partition and field filter
+        collectAndCheckBatchReadWrite(
+                true,
+                false,
+                "dt = '2022-01-01' OR rate > 115",
+                Collections.emptyList(),
+                dailyRates());
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                true,
+                false,
+                null,
+                Collections.singletonList("currency"),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar"),
+                        changelogRow("+I", "US Dollar"),
+                        changelogRow("+I", "Yen"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Euro")));
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                true,
+                false,
+                "rate = 119",
+                Arrays.asList("currency", "dt"),
+                Collections.singletonList(changelogRow("+I", "Euro", 
"2022-01-02")));
     }
 
     @Test
@@ -145,6 +264,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 collectAndCheckBatchReadWrite(
                         false,
                         true,
+                        null,
+                        Collections.emptyList(),
                         Arrays.asList(
                                 changelogRow("+I", "US Dollar", 102L),
                                 changelogRow("+I", "Yen", 1L),
@@ -161,13 +282,87 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 managedTable,
                 Collections.emptyMap(),
                 Collections.singletonList(changelogRow("+I", "Euro", 100L)));
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                false,
+                true,
+                "currency = 'Euro'",
+                Collections.emptyList(),
+                Collections.singletonList(changelogRow("+I", "Euro", 119L)));
+
+        collectAndCheckBatchReadWrite(
+                false,
+                true,
+                "119 >= rate AND 102 < rate",
+                Collections.emptyList(),
+                Collections.singletonList(changelogRow("+I", "Euro", 119L)));
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                false,
+                true,
+                null,
+                Arrays.asList("rate", "currency"),
+                Arrays.asList(
+                        changelogRow("+I", 102L, "US Dollar"),
+                        changelogRow("+I", 1L, "Yen"),
+                        changelogRow("+I", 119L, "Euro")));
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                false,
+                true,
+                "currency = 'Yen'",
+                Collections.singletonList("rate"),
+                Collections.singletonList(changelogRow("+I", 1L)));
     }
 
     @Test
     public void testBatchWriteNonPartitionedRecordsWithoutPk() throws 
Exception {
         // input is rates()
-        String managedTable = collectAndCheckBatchReadWrite(false, false, 
rates());
+        String managedTable =
+                collectAndCheckBatchReadWrite(false, false, null, 
Collections.emptyList(), rates());
         checkFileStorePath(tEnv, managedTable);
+
+        // test field filter
+        collectAndCheckBatchReadWrite(
+                false,
+                false,
+                "currency = 'Euro'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Euro", 119L)));
+
+        collectAndCheckBatchReadWrite(false, false, "rate >= 1", 
Collections.emptyList(), rates());
+
+        // test projection
+        collectAndCheckBatchReadWrite(
+                false,
+                false,
+                null,
+                Collections.singletonList("currency"),
+                Arrays.asList(
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Yen"),
+                        changelogRow("+I", "US Dollar")));
+
+        // test projection and filter
+        collectAndCheckBatchReadWrite(
+                false,
+                false,
+                "rate > 100 OR currency = 'Yen'",
+                Collections.singletonList("currency"),
+                Arrays.asList(
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Yen"),
+                        changelogRow("+I", "US Dollar")));
     }
 
     @Test
@@ -177,6 +372,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         Tuple2<String, BlockingIterator<Row, Row>> tuple =
                 collectAndCheckStreamingReadWriteWithoutClose(
                         Collections.emptyMap(),
+                        "dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency 
= 'HK Dollar'",
+                        Collections.emptyList(),
                         Arrays.asList(
                                 // part = 2022-01-01
                                 changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
@@ -186,6 +383,27 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         checkFileStorePath(tEnv, managedTable);
         BlockingIterator<Row, Row> streamIter = tuple.f1;
 
+        // test log store in hybrid mode accepts all filters
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO `%s` PARTITION (dt = 
'2022-01-03')\n"
+                                        + "VALUES('HK Dollar', 100), ('Yen', 
20)\n",
+                                managedTable))
+                .await();
+
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO `%s` PARTITION (dt = 
'2022-01-04')\n"
+                                        + "VALUES('Yen', 20)\n",
+                                managedTable))
+                .await();
+
+        assertThat(streamIter.collect(2, 10, TimeUnit.SECONDS))
+                .containsExactlyInAnyOrderElementsOf(
+                        Arrays.asList(
+                                changelogRow("+I", "HK Dollar", 100L, 
"2022-01-03"),
+                                changelogRow("+I", "Yen", 20L, "2022-01-03")));
+
         // overwrite partition 2022-01-02
         prepareEnvAndOverwrite(
                 managedTable,
@@ -205,10 +423,55 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
                         // part = 2022-01-02
                         changelogRow("+I", "Euro", 100L, "2022-01-02"),
-                        changelogRow("+I", "Yen", 1L, "2022-01-02")));
+                        changelogRow("+I", "Yen", 1L, "2022-01-02"),
+                        // part = 2022-01-03
+                        changelogRow("+I", "HK Dollar", 100L, "2022-01-03"),
+                        changelogRow("+I", "Yen", 20L, "2022-01-03"),
+                        // part = 2022-01-04
+                        changelogRow("+I", "Yen", 20L, "2022-01-04")));
 
         // check no changelog generated for streaming read
         assertNoMoreRecords(streamIter);
+
+        // filter on partition
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                true,
+                Collections.emptyMap(),
+                "dt = '2022-01-01'",
+                Collections.emptyList(),
+                Collections.singletonList(changelogRow("+I", "US Dollar", 
102L, "2022-01-01")));
+
+        // test field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                true,
+                Collections.emptyMap(),
+                "currency = 'US Dollar'",
+                Collections.emptyList(),
+                Collections.singletonList(changelogRow("+I", "US Dollar", 
102L, "2022-01-01")));
+
+        // test partition and field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                true,
+                Collections.emptyMap(),
+                "dt = '2022-01-01' AND rate = 1",
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // test projection and filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                true,
+                Collections.emptyMap(),
+                "dt = '2022-01-02' AND currency = 'Euro'",
+                Arrays.asList("rate", "dt", "currency"),
+                Collections.singletonList(changelogRow("+I", 119L, 
"2022-01-02", "Euro")));
     }
 
     @Test
@@ -223,11 +486,61 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         true,
                         true,
                         Collections.emptyMap(),
+                        null,
+                        Collections.emptyList(),
                         Arrays.asList(
                                 // part = 2022-01-01
                                 changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
                                 // part = 2022-01-02
                                 changelogRow("+I", "Euro", 119L, 
"2022-01-02"))));
+
+        // test partition filter
+        collectAndCheckStreamingReadWriteWithClose(
+                false,
+                true,
+                true,
+                Collections.emptyMap(),
+                "dt < '2022-01-02'",
+                Collections.emptyList(),
+                Collections.singletonList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+
+        // test field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                false,
+                true,
+                true,
+                Collections.emptyMap(),
+                "rate = 102",
+                Collections.emptyList(),
+                Collections.singletonList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+
+        // test partition and field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                false,
+                true,
+                true,
+                Collections.emptyMap(),
+                "rate = 102 or dt < '2022-01-02'",
+                Collections.emptyList(),
+                Collections.singletonList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+
+        // test projection and filter
+        collectAndCheckStreamingReadWriteWithClose(
+                false,
+                true,
+                true,
+                Collections.emptyMap(),
+                "rate = 102 or dt < '2022-01-02'",
+                Collections.singletonList("currency"),
+                Collections.singletonList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar")));
     }
 
     @Test
@@ -241,11 +554,86 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         true,
                         false,
                         Collections.emptyMap(),
+                        null,
+                        Collections.emptyList(),
                         Arrays.asList(
                                 // part = 2022-01-01
                                 changelogRow("+I", "US Dollar", 102L, 
"2022-01-01"),
                                 // part = 2022-01-02
                                 changelogRow("+I", "Euro", 115L, 
"2022-01-02"))));
+
+        // test partition filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                false,
+                Collections.emptyMap(),
+                "dt IS NOT NULL",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 115L, "2022-01-02")));
+
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                false,
+                Collections.emptyMap(),
+                "dt IS NULL",
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // test field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                false,
+                Collections.emptyMap(),
+                "currency = 'US Dollar' OR rate = 115",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 115L, "2022-01-02")));
+
+        // test partition and field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                false,
+                Collections.emptyMap(),
+                "(dt = '2022-01-02' AND currency = 'US Dollar') OR (dt = 
'2022-01-01' AND rate = 115)",
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // test projection
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                false,
+                Collections.emptyMap(),
+                null,
+                Collections.singletonList("rate"),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", 102L),
+                        // part = 2022-01-02
+                        changelogRow("+I", 115L)));
+
+        // test projection and filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                true,
+                false,
+                Collections.emptyMap(),
+                "dt <> '2022-01-01'",
+                Collections.singletonList("rate"),
+                Collections.singletonList(
+                        // part = 2022-01-02, Euro
+                        changelogRow("+I", 115L)));
     }
 
     @Test
@@ -259,9 +647,41 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         false,
                         true,
                         Collections.emptyMap(),
+                        null,
+                        Collections.emptyList(),
                         Arrays.asList(
                                 changelogRow("+I", "US Dollar", 102L),
                                 changelogRow("+I", "Euro", 119L))));
+
+        // test field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                false,
+                true,
+                Collections.emptyMap(),
+                "currency = 'Yen'",
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // test projection
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                false,
+                true,
+                Collections.emptyMap(),
+                null,
+                Collections.singletonList("currency"),
+                Arrays.asList(changelogRow("+I", "US Dollar"), 
changelogRow("+I", "Euro")));
+
+        // test projection and filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                false,
+                true,
+                Collections.emptyMap(),
+                "rate = 102",
+                Collections.singletonList("currency"),
+                Collections.singletonList(changelogRow("+I", "US Dollar")));
     }
 
     @Test
@@ -275,9 +695,51 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         false,
                         false,
                         Collections.emptyMap(),
+                        null,
+                        Collections.emptyList(),
                         Arrays.asList(
                                 changelogRow("+I", "US Dollar", 102L),
-                                changelogRow("+I", "Euro", 119L))));
+                                changelogRow("+I", "Euro", 119L),
+                                changelogRow("+I", null, 100L),
+                                changelogRow("+I", "HK Dollar", null))));
+
+        // test field filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                false,
+                false,
+                Collections.emptyMap(),
+                "currency IS NOT NULL",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("+I", "HK Dollar", null)));
+
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                false,
+                false,
+                Collections.emptyMap(),
+                "rate IS NOT NULL",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("+I", null, 100L)));
+
+        // test projection and filter
+        collectAndCheckStreamingReadWriteWithClose(
+                true,
+                false,
+                false,
+                Collections.emptyMap(),
+                "currency IS NOT NULL AND rate is NOT NULL",
+                Collections.singletonList("rate"),
+                Arrays.asList(
+                        changelogRow("+I", 102L), // US Dollar
+                        changelogRow("+I", 119L)) // Euro
+                );
     }
 
     @Test
@@ -287,6 +749,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 false,
                 true,
                 true,
+                null,
+                Collections.emptyList(),
                 Arrays.asList(
                         // part = 2022-01-01
                         changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
@@ -298,6 +762,86 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("-D", "Euro", 116L, "2022-01-01"),
                         // part = 2022-01-02
                         changelogRow("+I", "Euro", 119L, "2022-01-02")));
+
+        // test partition filter
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
+                "dt = '2022-01-01'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-U", "Euro", 114L, "2022-01-01"),
+                        changelogRow("+U", "Euro", 116L, "2022-01-01"),
+                        changelogRow("-D", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-D", "Euro", 116L, "2022-01-01")));
+
+        // test field filter
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
+                "currency = 'Yen'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("-D", "Yen", 1L, "2022-01-01")));
+
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
+                "rate = 114",
+                Collections.emptyList(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        changelogRow("-U", "Euro", 114L, "2022-01-01")));
+
+        // test partition and field filter
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
+                "rate = 114 AND dt = '2022-01-02'",
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // test projection
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
+                null,
+                Collections.singletonList("rate"),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", 102L), // US Dollar
+                        changelogRow("+I", 114L), // Euro
+                        changelogRow("+I", 1L), // Yen
+                        changelogRow("-U", 114L), // Euro
+                        changelogRow("+U", 116L), // Euro
+                        changelogRow("-D", 1L), // Yen
+                        changelogRow("-D", 116L), // Euro
+                        // part = 2022-01-02
+                        changelogRow("+I", 119L) // Euro
+                        ));
+
+        // test projection and filter
+        collectLatestLogAndCheck(
+                false,
+                true,
+                true,
+                "dt = '2022-01-02'",
+                Collections.singletonList("rate"),
+                Collections.singletonList(
+                        // part = 2022-01-02, Euro
+                        changelogRow("+I", 119L)));
     }
 
     @Test
@@ -307,6 +851,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 false,
                 true,
                 false,
+                null,
+                Collections.emptyList(),
                 Arrays.asList(
                         // part = 2022-01-01
                         changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
@@ -329,6 +875,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 false,
                 false,
                 true,
+                null,
+                Collections.emptyList(),
                 Arrays.asList(
                         changelogRow("+I", "US Dollar", 102L),
                         changelogRow("+I", "Euro", 114L),
@@ -338,6 +886,49 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("-D", "Euro", 116L),
                         changelogRow("+I", "Euro", 119L),
                         changelogRow("-D", "Yen", 1L)));
+
+        // test field filter
+        collectLatestLogAndCheck(
+                false,
+                false,
+                true,
+                "currency = 'Euro'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("-U", "Euro", 114L),
+                        changelogRow("+U", "Euro", 116L),
+                        changelogRow("-D", "Euro", 116L),
+                        changelogRow("+I", "Euro", 119L)));
+
+        // test projection
+        collectLatestLogAndCheck(
+                false,
+                false,
+                true,
+                null,
+                Collections.singletonList("currency"),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Yen"),
+                        changelogRow("-D", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("-D", "Yen")));
+
+        // test projection and filter
+        collectLatestLogAndCheck(
+                false,
+                false,
+                true,
+                "currency = 'Euro'",
+                Collections.singletonList("rate"),
+                Arrays.asList(
+                        changelogRow("+I", 114L),
+                        changelogRow("-U", 114L),
+                        changelogRow("+U", 116L),
+                        changelogRow("-D", 116L),
+                        changelogRow("+I", 119L)));
     }
 
     @Test
@@ -347,6 +938,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 false,
                 false,
                 false,
+                null,
+                Collections.emptyList(),
                 Arrays.asList(
                         changelogRow("+I", "US Dollar", 102L),
                         changelogRow("+I", "Euro", 114L),
@@ -357,7 +950,77 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("+I", "Euro", 119L),
                         changelogRow("-D", "Euro", 119L),
                         changelogRow("+I", "Euro", 119L),
-                        changelogRow("-D", "Yen", 1L)));
+                        changelogRow("-D", "Yen", 1L),
+                        changelogRow("+I", null, 100L),
+                        changelogRow("+I", "HK Dollar", null)));
+
+        // test field filter
+        collectLatestLogAndCheck(
+                false,
+                false,
+                false,
+                "currency = 'Euro'",
+                Collections.emptyList(),
+                Arrays.asList(
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("-D", "Euro", 114L),
+                        changelogRow("+I", "Euro", 116L),
+                        changelogRow("-D", "Euro", 116L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("-D", "Euro", 119L),
+                        changelogRow("+I", "Euro", 119L)));
+
+        // test projection
+        collectLatestLogAndCheck(
+                false,
+                false,
+                false,
+                null,
+                Arrays.asList("currency", "rate"),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Euro", 114L),
+                        changelogRow("+I", "Yen", 1L),
+                        changelogRow("-D", "Euro", 114L),
+                        changelogRow("+I", "Euro", 116L),
+                        changelogRow("-D", "Euro", 116L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("-D", "Euro", 119L),
+                        changelogRow("+I", "Euro", 119L),
+                        changelogRow("-D", "Yen", 1L),
+                        changelogRow("+I", null, 100L),
+                        changelogRow("+I", "HK Dollar", null)));
+
+        // test projection and filter
+        collectLatestLogAndCheck(
+                false,
+                false,
+                false,
+                "currency IS NOT NULL",
+                Collections.singletonList("currency"),
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("+I", "Yen"),
+                        changelogRow("-D", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("-D", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("-D", "Euro"),
+                        changelogRow("+I", "Euro"),
+                        changelogRow("-D", "Yen"),
+                        changelogRow("+I", "HK Dollar")));
+
+        collectLatestLogAndCheck(
+                false,
+                false,
+                false,
+                "rate = 119",
+                Collections.singletonList("currency"),
+                Arrays.asList(
+                        changelogRow("+I", "Euro"),
+                        changelogRow("-D", "Euro"),
+                        changelogRow("+I", "Euro")));
     }
 
     @Test
@@ -372,10 +1035,42 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("+U", "Euro", 119L));
 
         // currency as pk
-        collectLatestLogAndCheck(true, false, true, expected);
+        collectLatestLogAndCheck(true, false, true, null, 
Collections.emptyList(), expected);
 
         // without pk
-        collectLatestLogAndCheck(true, false, true, expected);
+        collectLatestLogAndCheck(true, false, true, null, 
Collections.emptyList(), expected);
+
+        // test field filter
+        collectLatestLogAndCheck(
+                true,
+                false,
+                true,
+                "rate = 114",
+                Collections.emptyList(),
+                Arrays.asList(changelogRow("+I", "Euro", 114L), 
changelogRow("-U", "Euro", 114L)));
+
+        // test projection
+        collectLatestLogAndCheck(
+                true,
+                false,
+                true,
+                null,
+                Collections.singletonList("rate"),
+                Arrays.asList(
+                        changelogRow("+I", 102L),
+                        changelogRow("+I", 114L),
+                        changelogRow("+I", 1L),
+                        changelogRow("-U", 114L),
+                        changelogRow("+U", 119L)));
+
+        // test projection and filter
+        collectLatestLogAndCheck(
+                true,
+                false,
+                true,
+                "rate = 114",
+                Collections.singletonList("currency"),
+                Arrays.asList(changelogRow("+I", "Euro"), changelogRow("-U", 
"Euro")));
     }
 
     @Test
@@ -391,6 +1086,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
                         changelogRow("+I", "Yen", 1L, "2022-01-01"),
                         changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        changelogRow("-U", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+U", "US Dollar", 114L, "2022-01-01"),
                         // part = 2022-01-02
                         changelogRow("+I", "Euro", 119L, "2022-01-02")));
 
@@ -456,10 +1153,13 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         changelogRow("+I", "Euro", 119L, "2022-01-02")));
     }
 
-    // ------------------------ Tools ----------------------------------
-
     private String collectAndCheckBatchReadWrite(
-            boolean partitioned, boolean hasPk, List<Row> expected) throws 
Exception {
+            boolean partitioned,
+            boolean hasPk,
+            @Nullable String filter,
+            List<String> projection,
+            List<Row> expected)
+            throws Exception {
         return collectAndCheckUnderSameEnv(
                         false,
                         false,
@@ -468,6 +1168,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                         hasPk,
                         true,
                         Collections.emptyMap(),
+                        filter,
+                        projection,
                         expected)
                 .f0;
     }
@@ -477,31 +1179,60 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
             boolean partitioned,
             boolean hasPk,
             Map<String, String> readHints,
+            @Nullable String filter,
+            List<String> projection,
             List<Row> expected)
             throws Exception {
         Tuple2<String, BlockingIterator<Row, Row>> tuple =
                 collectAndCheckUnderSameEnv(
-                        true, enableLogStore, false, partitioned, hasPk, true, 
readHints, expected);
+                        true,
+                        enableLogStore,
+                        false,
+                        partitioned,
+                        hasPk,
+                        true,
+                        readHints,
+                        filter,
+                        projection,
+                        expected);
         tuple.f1.close();
         return tuple.f0;
     }
 
     private Tuple2<String, BlockingIterator<Row, Row>>
             collectAndCheckStreamingReadWriteWithoutClose(
-                    Map<String, String> readHints, List<Row> expected) throws 
Exception {
+                    Map<String, String> readHints,
+                    @Nullable String filter,
+                    List<String> projection,
+                    List<Row> expected)
+                    throws Exception {
         return collectAndCheckUnderSameEnv(
-                true, true, false, true, true, true, readHints, expected);
+                true, true, false, true, true, true, readHints, filter, 
projection, expected);
     }
 
     private void collectLatestLogAndCheck(
-            boolean insertOnly, boolean partitioned, boolean hasPk, List<Row> 
expected)
+            boolean insertOnly,
+            boolean partitioned,
+            boolean hasPk,
+            @Nullable String filter,
+            List<String> projection,
+            List<Row> expected)
             throws Exception {
         Map<String, String> hints = new HashMap<>();
         hints.put(
                 LOG_PREFIX + LogOptions.SCAN.key(),
                 LogOptions.LogStartupMode.LATEST.name().toLowerCase());
         collectAndCheckUnderSameEnv(
-                        true, true, insertOnly, partitioned, hasPk, false, 
hints, expected)
+                        true,
+                        true,
+                        insertOnly,
+                        partitioned,
+                        hasPk,
+                        false,
+                        hints,
+                        filter,
+                        projection,
+                        expected)
                 .f1
                 .close();
     }
@@ -517,7 +1248,16 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp");
         hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(), 
String.valueOf(timestamp));
         collectAndCheckUnderSameEnv(
-                        true, true, insertOnly, partitioned, hasPk, true, 
hints, expected)
+                        true,
+                        true,
+                        insertOnly,
+                        partitioned,
+                        hasPk,
+                        true,
+                        hints,
+                        null,
+                        Collections.emptyList(),
+                        expected)
                 .f1
                 .close();
     }
@@ -530,6 +1270,8 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
             boolean hasPk,
             boolean writeFirst,
             Map<String, String> readHints,
+            @Nullable String filter,
+            List<String> projection,
             List<Row> expected)
             throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
@@ -565,7 +1307,7 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         tEnv.executeSql(managedTableDdl);
 
         String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
-        String selectQuery = prepareSimpleSelectQuery(managedTable, readHints);
+        String selectQuery = prepareSelectQuery(managedTable, readHints, 
filter, projection);
 
         BlockingIterator<Row, Row> iterator;
         if (writeFirst) {
@@ -621,11 +1363,13 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
             Map<String, String> hints,
             List<Row> expectedRecords)
             throws Exception {
-        String selectQuery = prepareSimpleSelectQuery(managedTable, hints);
         List<Row> actual = new ArrayList<>();
         BlockingIterator<Row, Row> iterator =
-                collect(tEnv, selectQuery, expectedRecords.size(), actual);
-
+                collect(
+                        tEnv,
+                        prepareSimpleSelectQuery(managedTable, hints),
+                        expectedRecords.size(),
+                        actual);
         
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecords);
         return iterator;
     }
@@ -659,20 +1403,26 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         StringBuilder ddl =
                 new StringBuilder("CREATE TABLE IF NOT EXISTS ")
                         .append(String.format("`%s`", managedTableName));
+        ddl.append(prepareOptions(tableOptions))
+                .append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", 
sourceTableName));
+        return ddl.toString();
+    }
+
+    private static String prepareOptions(Map<String, String> tableOptions) {
+        StringBuilder with = new StringBuilder();
         if (tableOptions.size() > 0) {
-            ddl.append(" WITH (\n");
+            with.append(" WITH (\n");
             tableOptions.forEach(
                     (k, v) ->
-                            ddl.append("  ")
+                            with.append("  ")
                                     .append(String.format("'%s'", k))
                                     .append(" = ")
                                     .append(String.format("'%s',\n", v)));
-            int len = ddl.length();
-            ddl.delete(len - 2, len);
-            ddl.append(")");
+            int len = with.length();
+            with.delete(len - 2, len);
+            with.append(")");
         }
-        ddl.append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", 
sourceTableName));
-        return ddl.toString();
+        return with.toString();
     }
 
     private static String prepareInsertIntoQuery(String sourceTableName, 
String managedTableName) {
@@ -743,7 +1493,25 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
     }
 
     private static String prepareSimpleSelectQuery(String tableName, 
Map<String, String> hints) {
-        return String.format("SELECT * FROM `%s` %s", tableName, 
buildHints(hints));
+        return prepareSelectQuery(tableName, hints, null, 
Collections.emptyList());
+    }
+
+    private static String prepareSelectQuery(
+            String tableName,
+            Map<String, String> hints,
+            @Nullable String filter,
+            List<String> projections) {
+        StringBuilder queryBuilder =
+                new StringBuilder(
+                        String.format(
+                                "SELECT %s FROM `%s` %s",
+                                projections.isEmpty() ? "*" : String.join(", 
", projections),
+                                tableName,
+                                buildHints(hints)));
+        if (filter != null) {
+            queryBuilder.append("\nWHERE ").append(filter);
+        }
+        return queryBuilder.toString();
     }
 
     private static String buildHints(Map<String, String> hints) {
@@ -868,6 +1636,7 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 changelogRow("+I", "Euro", 114L, "2022-01-01"),
                 changelogRow("+I", "Yen", 1L, "2022-01-01"),
                 changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                changelogRow("+I", "US Dollar", 114L, "2022-01-01"),
                 // part = 2022-01-02
                 changelogRow("+I", "Euro", 119L, "2022-01-02"));
     }
@@ -909,7 +1678,9 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 changelogRow("+I", "Euro", 119L),
                 changelogRow("-U", "Euro", 119L),
                 changelogRow("+U", "Euro", 119L),
-                changelogRow("-D", "Yen", 1L));
+                changelogRow("-D", "Yen", 1L),
+                changelogRow("+I", null, 100L),
+                changelogRow("+I", "HK Dollar", null));
     }
 
     private static List<Row> dailyRatesChangelogWithUB() {
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
index e17699f..7a51ce4 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
@@ -44,7 +44,7 @@ public class KafkaLogDeserializationSchema implements 
KafkaDeserializationSchema
     private final TypeInformation<RowData> producedType;
     private final int fieldCount;
     private final int[] primaryKey;
-    @Nullable private final DeserializationSchema<RowData> keyDeserializer;
+    @Nullable private final DeserializationSchema<RowData> 
primaryKeyDeserializer;
     private final DeserializationSchema<RowData> valueDeserializer;
     private final RowData.FieldGetter[] keyFieldGetters;
     @Nullable private final int[][] projectFields;
@@ -54,11 +54,11 @@ public class KafkaLogDeserializationSchema implements 
KafkaDeserializationSchema
     public KafkaLogDeserializationSchema(
             DataType physicalType,
             int[] primaryKey,
-            @Nullable DeserializationSchema<RowData> keyDeserializer,
+            @Nullable DeserializationSchema<RowData> primaryKeyDeserializer,
             DeserializationSchema<RowData> valueDeserializer,
             @Nullable int[][] projectFields) {
         this.primaryKey = primaryKey;
-        this.keyDeserializer = keyDeserializer;
+        this.primaryKeyDeserializer = primaryKeyDeserializer;
         this.valueDeserializer = valueDeserializer;
         DataType projectedType =
                 projectFields == null
@@ -82,8 +82,8 @@ public class KafkaLogDeserializationSchema implements 
KafkaDeserializationSchema
 
     @Override
     public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
-        if (keyDeserializer != null) {
-            keyDeserializer.open(context);
+        if (primaryKeyDeserializer != null) {
+            primaryKeyDeserializer.open(context);
         }
         valueDeserializer.open(context);
         projectCollector = new ProjectCollector();
@@ -107,7 +107,7 @@ public class KafkaLogDeserializationSchema implements 
KafkaDeserializationSchema
         Collector<RowData> collector = 
projectCollector.project(underCollector);
 
         if (primaryKey.length > 0 && record.value() == null) {
-            RowData key = keyDeserializer.deserialize(record.key());
+            RowData key = primaryKeyDeserializer.deserialize(record.key());
             GenericRowData value = new GenericRowData(RowKind.DELETE, 
fieldCount);
             for (int i = 0; i < primaryKey.length; i++) {
                 value.setField(primaryKey[i], 
keyFieldGetters[i].getFieldOrNull(key));
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index a5e787b..9904934 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -167,11 +167,11 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
-        DeserializationSchema<RowData> keyDeserializer = null;
+        DeserializationSchema<RowData> primaryKeyDeserializer = null;
         int[] primaryKey = schema.getPrimaryKeyIndexes();
         if (primaryKey.length > 0) {
             DataType keyType = DataTypeUtils.projectRow(physicalType, 
primaryKey);
-            keyDeserializer =
+            primaryKeyDeserializer =
                     LogStoreTableFactory.getKeyDecodingFormat(helper)
                             .createRuntimeDecoder(sourceContext, keyType);
         }
@@ -183,7 +183,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                 toKafkaProperties(helper.getOptions()),
                 physicalType,
                 primaryKey,
-                keyDeserializer,
+                primaryKeyDeserializer,
                 valueDeserializer,
                 projectFields,
                 helper.getOptions().get(CONSISTENCY),

Reply via email to