This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit b58bd4cc5a94297765e4c150046801233c587fcf Author: Jane Chan <[email protected]> AuthorDate: Fri Apr 1 18:34:20 2022 +0800 [FLINK-26857] Add ITCase for projection and filter predicate This closes #69 --- .../store/connector/source/TableStoreSource.java | 4 +- .../store/connector/ReadWriteTableITCase.java | 835 ++++++++++++++++++++- .../store/kafka/KafkaLogDeserializationSchema.java | 12 +- .../table/store/kafka/KafkaLogStoreFactory.java | 6 +- 4 files changed, 813 insertions(+), 44 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java index 8de30f6..e1681fe 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java @@ -44,7 +44,6 @@ import org.apache.flink.table.types.logical.LogicalType; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -165,8 +164,7 @@ public class TableStoreSource fieldFilters = filters; } return Result.of( - new ArrayList<>(filters), - fieldFilters == null ? Collections.emptyList() : fieldFilters); + filters, streaming && logStoreTableFactory != null ? filters : fieldFilters); } @Override diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java index 3458529..6ea0367 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java @@ -35,6 +35,8 @@ import org.apache.flink.types.Row; import org.junit.Test; +import javax.annotation.Nullable; + import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -64,13 +66,15 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { List<Row> expectedRecords = Arrays.asList( // part = 2022-01-01 - changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "US Dollar", 114L, "2022-01-01"), changelogRow("+I", "Yen", 1L, "2022-01-01"), changelogRow("+I", "Euro", 114L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 119L, "2022-01-02")); // test batch read - String managedTable = collectAndCheckBatchReadWrite(true, true, expectedRecords); + String managedTable = + collectAndCheckBatchReadWrite( + true, true, null, Collections.emptyList(), expectedRecords); checkFileStorePath(tEnv, managedTable); // test streaming read @@ -97,13 +101,80 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { Collections.emptyMap(), Arrays.asList( // part = 2022-01-01 - changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "US Dollar", 114L, "2022-01-01"), changelogRow("+I", "Yen", 1L, "2022-01-01"), changelogRow("+I", "Euro", 114L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 100L, "2022-01-02"), changelogRow("+I", "Yen", 1L, "2022-01-02"))) .close(); + + // test partition filter + List<Row> expected = + Arrays.asList( + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "US Dollar", 114L, "2022-01-01")); + collectAndCheckBatchReadWrite( + true, true, "dt <> '2022-01-02'", Collections.emptyList(), expected); + + collectAndCheckBatchReadWrite( + true, true, "dt = '2022-01-01'", Collections.emptyList(), expected); + + // test field filter + collectAndCheckBatchReadWrite( + true, + true, + "rate >= 100", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 114L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 119L, "2022-01-02"))); + + // test partition and field filter + collectAndCheckBatchReadWrite( + true, + true, + "rate >= 100 AND dt = '2022-01-02'", + Collections.emptyList(), + Collections.singletonList(changelogRow("+I", "Euro", 119L, "2022-01-02"))); + + // test projection + collectAndCheckBatchReadWrite( + true, + true, + null, + Collections.singletonList("dt"), + Arrays.asList( + changelogRow("+I", "2022-01-01"), // US Dollar + changelogRow("+I", "2022-01-01"), // Yen + changelogRow("+I", "2022-01-01"), // Euro + changelogRow("+I", "2022-01-02"))); // Euro + + collectAndCheckBatchReadWrite( + true, + true, + null, + Collections.singletonList("dt, currency, rate"), + Arrays.asList( + changelogRow("+I", "2022-01-01", "US Dollar", 114L), // US Dollar + changelogRow("+I", "2022-01-01", "Yen", 1L), // Yen + changelogRow("+I", "2022-01-01", "Euro", 114L), // Euro + changelogRow("+I", "2022-01-02", "Euro", 119L))); // Euro + + // test projection and filter + collectAndCheckBatchReadWrite( + true, + true, + "rate = 114", + Collections.singletonList("rate"), + Arrays.asList( + changelogRow("+I", 114L), // US Dollar + changelogRow("+I", 114L) // Euro + )); } @Test @@ -111,7 +182,9 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { // input is dailyRates() // test batch read - String managedTable = collectAndCheckBatchReadWrite(true, false, dailyRates()); + String managedTable = + collectAndCheckBatchReadWrite( + true, false, null, Collections.emptyList(), dailyRates()); checkFileStorePath(tEnv, managedTable); // overwrite dynamic partition @@ -136,6 +209,52 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { // part = 2022-01-02 changelogRow("+I", "Yen", 2L, "2022-01-02"))) .close(); + + // test partition filter + collectAndCheckBatchReadWrite( + true, false, "dt >= '2022-01-01'", Collections.emptyList(), dailyRates()); + + // test field filter + collectAndCheckBatchReadWrite( + true, + false, + "currency = 'US Dollar'", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 114L, "2022-01-01"))); + + // test partition and field filter + collectAndCheckBatchReadWrite( + true, + false, + "dt = '2022-01-01' OR rate > 115", + Collections.emptyList(), + dailyRates()); + + // test projection + collectAndCheckBatchReadWrite( + true, + false, + null, + Collections.singletonList("currency"), + Arrays.asList( + changelogRow("+I", "US Dollar"), + changelogRow("+I", "US Dollar"), + changelogRow("+I", "Yen"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Euro"))); + + // test projection and filter + collectAndCheckBatchReadWrite( + true, + false, + "rate = 119", + Arrays.asList("currency", "dt"), + Collections.singletonList(changelogRow("+I", "Euro", "2022-01-02"))); } @Test @@ -145,6 +264,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { collectAndCheckBatchReadWrite( false, true, + null, + Collections.emptyList(), Arrays.asList( changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Yen", 1L), @@ -161,13 +282,87 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { managedTable, Collections.emptyMap(), Collections.singletonList(changelogRow("+I", "Euro", 100L))); + + // test field filter + collectAndCheckBatchReadWrite( + false, + true, + "currency = 'Euro'", + Collections.emptyList(), + Collections.singletonList(changelogRow("+I", "Euro", 119L))); + + collectAndCheckBatchReadWrite( + false, + true, + "119 >= rate AND 102 < rate", + Collections.emptyList(), + Collections.singletonList(changelogRow("+I", "Euro", 119L))); + + // test projection + collectAndCheckBatchReadWrite( + false, + true, + null, + Arrays.asList("rate", "currency"), + Arrays.asList( + changelogRow("+I", 102L, "US Dollar"), + changelogRow("+I", 1L, "Yen"), + changelogRow("+I", 119L, "Euro"))); + + // test projection and filter + collectAndCheckBatchReadWrite( + false, + true, + "currency = 'Yen'", + Collections.singletonList("rate"), + Collections.singletonList(changelogRow("+I", 1L))); } @Test public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception { // input is rates() - String managedTable = collectAndCheckBatchReadWrite(false, false, rates()); + String managedTable = + collectAndCheckBatchReadWrite(false, false, null, Collections.emptyList(), rates()); checkFileStorePath(tEnv, managedTable); + + // test field filter + collectAndCheckBatchReadWrite( + false, + false, + "currency = 'Euro'", + Collections.emptyList(), + Arrays.asList( + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Euro", 119L))); + + collectAndCheckBatchReadWrite(false, false, "rate >= 1", Collections.emptyList(), rates()); + + // test projection + collectAndCheckBatchReadWrite( + false, + false, + null, + Collections.singletonList("currency"), + Arrays.asList( + changelogRow("+I", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Yen"), + changelogRow("+I", "US Dollar"))); + + // test projection and filter + collectAndCheckBatchReadWrite( + false, + false, + "rate > 100 OR currency = 'Yen'", + Collections.singletonList("currency"), + Arrays.asList( + changelogRow("+I", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Yen"), + changelogRow("+I", "US Dollar"))); } @Test @@ -177,6 +372,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { Tuple2<String, BlockingIterator<Row, Row>> tuple = collectAndCheckStreamingReadWriteWithoutClose( Collections.emptyMap(), + "dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency = 'HK Dollar'", + Collections.emptyList(), Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), @@ -186,6 +383,27 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { checkFileStorePath(tEnv, managedTable); BlockingIterator<Row, Row> streamIter = tuple.f1; + // test log store in hybrid mode accepts all filters + tEnv.executeSql( + String.format( + "INSERT INTO `%s` PARTITION (dt = '2022-01-03')\n" + + "VALUES('HK Dollar', 100), ('Yen', 20)\n", + managedTable)) + .await(); + + tEnv.executeSql( + String.format( + "INSERT INTO `%s` PARTITION (dt = '2022-01-04')\n" + + "VALUES('Yen', 20)\n", + managedTable)) + .await(); + + assertThat(streamIter.collect(2, 10, TimeUnit.SECONDS)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList( + changelogRow("+I", "HK Dollar", 100L, "2022-01-03"), + changelogRow("+I", "Yen", 20L, "2022-01-03"))); + // overwrite partition 2022-01-02 prepareEnvAndOverwrite( managedTable, @@ -205,10 +423,55 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+I", "US Dollar", 102L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 100L, "2022-01-02"), - changelogRow("+I", "Yen", 1L, "2022-01-02"))); + changelogRow("+I", "Yen", 1L, "2022-01-02"), + // part = 2022-01-03 + changelogRow("+I", "HK Dollar", 100L, "2022-01-03"), + changelogRow("+I", "Yen", 20L, "2022-01-03"), + // part = 2022-01-04 + changelogRow("+I", "Yen", 20L, "2022-01-04"))); // check no changelog generated for streaming read assertNoMoreRecords(streamIter); + + // filter on partition + collectAndCheckStreamingReadWriteWithClose( + true, + true, + true, + Collections.emptyMap(), + "dt = '2022-01-01'", + Collections.emptyList(), + Collections.singletonList(changelogRow("+I", "US Dollar", 102L, "2022-01-01"))); + + // test field filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + true, + Collections.emptyMap(), + "currency = 'US Dollar'", + Collections.emptyList(), + Collections.singletonList(changelogRow("+I", "US Dollar", 102L, "2022-01-01"))); + + // test partition and field filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + true, + Collections.emptyMap(), + "dt = '2022-01-01' AND rate = 1", + Collections.emptyList(), + Collections.emptyList()); + + // test projection and filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + true, + Collections.emptyMap(), + "dt = '2022-01-02' AND currency = 'Euro'", + Arrays.asList("rate", "dt", "currency"), + Collections.singletonList(changelogRow("+I", 119L, "2022-01-02", "Euro"))); } @Test @@ -223,11 +486,61 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { true, true, Collections.emptyMap(), + null, + Collections.emptyList(), Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 119L, "2022-01-02")))); + + // test partition filter + collectAndCheckStreamingReadWriteWithClose( + false, + true, + true, + Collections.emptyMap(), + "dt < '2022-01-02'", + Collections.emptyList(), + Collections.singletonList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"))); + + // test field filter + collectAndCheckStreamingReadWriteWithClose( + false, + true, + true, + Collections.emptyMap(), + "rate = 102", + Collections.emptyList(), + Collections.singletonList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"))); + + // test partition and field filter + collectAndCheckStreamingReadWriteWithClose( + false, + true, + true, + Collections.emptyMap(), + "rate = 102 or dt < '2022-01-02'", + Collections.emptyList(), + Collections.singletonList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"))); + + // test projection and filter + collectAndCheckStreamingReadWriteWithClose( + false, + true, + true, + Collections.emptyMap(), + "rate = 102 or dt < '2022-01-02'", + Collections.singletonList("currency"), + Collections.singletonList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar"))); } @Test @@ -241,11 +554,86 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { true, false, Collections.emptyMap(), + null, + Collections.emptyList(), Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 115L, "2022-01-02")))); + + // test partition filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + "dt IS NOT NULL", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 115L, "2022-01-02"))); + + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + "dt IS NULL", + Collections.emptyList(), + Collections.emptyList()); + + // test field filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + "currency = 'US Dollar' OR rate = 115", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 115L, "2022-01-02"))); + + // test partition and field filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + "(dt = '2022-01-02' AND currency = 'US Dollar') OR (dt = '2022-01-01' AND rate = 115)", + Collections.emptyList(), + Collections.emptyList()); + + // test projection + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + null, + Collections.singletonList("rate"), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", 102L), + // part = 2022-01-02 + changelogRow("+I", 115L))); + + // test projection and filter + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + "dt <> '2022-01-01'", + Collections.singletonList("rate"), + Collections.singletonList( + // part = 2022-01-02, Euro + changelogRow("+I", 115L))); } @Test @@ -259,9 +647,41 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { false, true, Collections.emptyMap(), + null, + Collections.emptyList(), Arrays.asList( changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L)))); + + // test field filter + collectAndCheckStreamingReadWriteWithClose( + true, + false, + true, + Collections.emptyMap(), + "currency = 'Yen'", + Collections.emptyList(), + Collections.emptyList()); + + // test projection + collectAndCheckStreamingReadWriteWithClose( + true, + false, + true, + Collections.emptyMap(), + null, + Collections.singletonList("currency"), + Arrays.asList(changelogRow("+I", "US Dollar"), changelogRow("+I", "Euro"))); + + // test projection and filter + collectAndCheckStreamingReadWriteWithClose( + true, + false, + true, + Collections.emptyMap(), + "rate = 102", + Collections.singletonList("currency"), + Collections.singletonList(changelogRow("+I", "US Dollar"))); } @Test @@ -275,9 +695,51 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { false, false, Collections.emptyMap(), + null, + Collections.emptyList(), Arrays.asList( changelogRow("+I", "US Dollar", 102L), - changelogRow("+I", "Euro", 119L)))); + changelogRow("+I", "Euro", 119L), + changelogRow("+I", null, 100L), + changelogRow("+I", "HK Dollar", null)))); + + // test field filter + collectAndCheckStreamingReadWriteWithClose( + true, + false, + false, + Collections.emptyMap(), + "currency IS NOT NULL", + Collections.emptyList(), + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 119L), + changelogRow("+I", "HK Dollar", null))); + + collectAndCheckStreamingReadWriteWithClose( + true, + false, + false, + Collections.emptyMap(), + "rate IS NOT NULL", + Collections.emptyList(), + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 119L), + changelogRow("+I", null, 100L))); + + // test projection and filter + collectAndCheckStreamingReadWriteWithClose( + true, + false, + false, + Collections.emptyMap(), + "currency IS NOT NULL AND rate is NOT NULL", + Collections.singletonList("rate"), + Arrays.asList( + changelogRow("+I", 102L), // US Dollar + changelogRow("+I", 119L)) // Euro + ); } @Test @@ -287,6 +749,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { false, true, true, + null, + Collections.emptyList(), Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), @@ -298,6 +762,86 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("-D", "Euro", 116L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 119L, "2022-01-02"))); + + // test partition filter + collectLatestLogAndCheck( + false, + true, + true, + "dt = '2022-01-01'", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("-U", "Euro", 114L, "2022-01-01"), + changelogRow("+U", "Euro", 116L, "2022-01-01"), + changelogRow("-D", "Yen", 1L, "2022-01-01"), + changelogRow("-D", "Euro", 116L, "2022-01-01"))); + + // test field filter + collectLatestLogAndCheck( + false, + true, + true, + "currency = 'Yen'", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("-D", "Yen", 1L, "2022-01-01"))); + + collectLatestLogAndCheck( + false, + true, + true, + "rate = 114", + Collections.emptyList(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("-U", "Euro", 114L, "2022-01-01"))); + + // test partition and field filter + collectLatestLogAndCheck( + false, + true, + true, + "rate = 114 AND dt = '2022-01-02'", + Collections.emptyList(), + Collections.emptyList()); + + // test projection + collectLatestLogAndCheck( + false, + true, + true, + null, + Collections.singletonList("rate"), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", 102L), // US Dollar + changelogRow("+I", 114L), // Euro + changelogRow("+I", 1L), // Yen + changelogRow("-U", 114L), // Euro + changelogRow("+U", 116L), // Euro + changelogRow("-D", 1L), // Yen + changelogRow("-D", 116L), // Euro + // part = 2022-01-02 + changelogRow("+I", 119L) // Euro + )); + + // test projection and filter + collectLatestLogAndCheck( + false, + true, + true, + "dt = '2022-01-02'", + Collections.singletonList("rate"), + Collections.singletonList( + // part = 2022-01-02, Euro + changelogRow("+I", 119L))); } @Test @@ -307,6 +851,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { false, true, false, + null, + Collections.emptyList(), Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), @@ -329,6 +875,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { false, false, true, + null, + Collections.emptyList(), Arrays.asList( changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 114L), @@ -338,6 +886,49 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("-D", "Euro", 116L), changelogRow("+I", "Euro", 119L), changelogRow("-D", "Yen", 1L))); + + // test field filter + collectLatestLogAndCheck( + false, + false, + true, + "currency = 'Euro'", + Collections.emptyList(), + Arrays.asList( + changelogRow("+I", "Euro", 114L), + changelogRow("-U", "Euro", 114L), + changelogRow("+U", "Euro", 116L), + changelogRow("-D", "Euro", 116L), + changelogRow("+I", "Euro", 119L))); + + // test projection + collectLatestLogAndCheck( + false, + false, + true, + null, + Collections.singletonList("currency"), + Arrays.asList( + changelogRow("+I", "US Dollar"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Yen"), + changelogRow("-D", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("-D", "Yen"))); + + // test projection and filter + collectLatestLogAndCheck( + false, + false, + true, + "currency = 'Euro'", + Collections.singletonList("rate"), + Arrays.asList( + changelogRow("+I", 114L), + changelogRow("-U", 114L), + changelogRow("+U", 116L), + changelogRow("-D", 116L), + changelogRow("+I", 119L))); } @Test @@ -347,6 +938,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { false, false, false, + null, + Collections.emptyList(), Arrays.asList( changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 114L), @@ -357,7 +950,77 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+I", "Euro", 119L), changelogRow("-D", "Euro", 119L), changelogRow("+I", "Euro", 119L), - changelogRow("-D", "Yen", 1L))); + changelogRow("-D", "Yen", 1L), + changelogRow("+I", null, 100L), + changelogRow("+I", "HK Dollar", null))); + + // test field filter + collectLatestLogAndCheck( + false, + false, + false, + "currency = 'Euro'", + Collections.emptyList(), + Arrays.asList( + changelogRow("+I", "Euro", 114L), + changelogRow("-D", "Euro", 114L), + changelogRow("+I", "Euro", 116L), + changelogRow("-D", "Euro", 116L), + changelogRow("+I", "Euro", 119L), + changelogRow("-D", "Euro", 119L), + changelogRow("+I", "Euro", 119L))); + + // test projection + collectLatestLogAndCheck( + false, + false, + false, + null, + Arrays.asList("currency", "rate"), + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Yen", 1L), + changelogRow("-D", "Euro", 114L), + changelogRow("+I", "Euro", 116L), + changelogRow("-D", "Euro", 116L), + changelogRow("+I", "Euro", 119L), + changelogRow("-D", "Euro", 119L), + changelogRow("+I", "Euro", 119L), + changelogRow("-D", "Yen", 1L), + changelogRow("+I", null, 100L), + changelogRow("+I", "HK Dollar", null))); + + // test projection and filter + collectLatestLogAndCheck( + false, + false, + false, + "currency IS NOT NULL", + Collections.singletonList("currency"), + Arrays.asList( + changelogRow("+I", "US Dollar"), + changelogRow("+I", "Euro"), + changelogRow("+I", "Yen"), + changelogRow("-D", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("-D", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("-D", "Euro"), + changelogRow("+I", "Euro"), + changelogRow("-D", "Yen"), + changelogRow("+I", "HK Dollar"))); + + collectLatestLogAndCheck( + false, + false, + false, + "rate = 119", + Collections.singletonList("currency"), + Arrays.asList( + changelogRow("+I", "Euro"), + changelogRow("-D", "Euro"), + changelogRow("+I", "Euro"))); } @Test @@ -372,10 +1035,42 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+U", "Euro", 119L)); // currency as pk - collectLatestLogAndCheck(true, false, true, expected); + collectLatestLogAndCheck(true, false, true, null, Collections.emptyList(), expected); // without pk - collectLatestLogAndCheck(true, false, true, expected); + collectLatestLogAndCheck(true, false, true, null, Collections.emptyList(), expected); + + // test field filter + collectLatestLogAndCheck( + true, + false, + true, + "rate = 114", + Collections.emptyList(), + Arrays.asList(changelogRow("+I", "Euro", 114L), changelogRow("-U", "Euro", 114L))); + + // test projection + collectLatestLogAndCheck( + true, + false, + true, + null, + Collections.singletonList("rate"), + Arrays.asList( + changelogRow("+I", 102L), + changelogRow("+I", 114L), + changelogRow("+I", 1L), + changelogRow("-U", 114L), + changelogRow("+U", 119L))); + + // test projection and filter + collectLatestLogAndCheck( + true, + false, + true, + "rate = 114", + Collections.singletonList("currency"), + Arrays.asList(changelogRow("+I", "Euro"), changelogRow("-U", "Euro"))); } @Test @@ -391,6 +1086,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+I", "US Dollar", 102L, "2022-01-01"), changelogRow("+I", "Yen", 1L, "2022-01-01"), changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("-U", "US Dollar", 102L, "2022-01-01"), + changelogRow("+U", "US Dollar", 114L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 119L, "2022-01-02"))); @@ -456,10 +1153,13 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+I", "Euro", 119L, "2022-01-02"))); } - // ------------------------ Tools ---------------------------------- - private String collectAndCheckBatchReadWrite( - boolean partitioned, boolean hasPk, List<Row> expected) throws Exception { + boolean partitioned, + boolean hasPk, + @Nullable String filter, + List<String> projection, + List<Row> expected) + throws Exception { return collectAndCheckUnderSameEnv( false, false, @@ -468,6 +1168,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { hasPk, true, Collections.emptyMap(), + filter, + projection, expected) .f0; } @@ -477,31 +1179,60 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { boolean partitioned, boolean hasPk, Map<String, String> readHints, + @Nullable String filter, + List<String> projection, List<Row> expected) throws Exception { Tuple2<String, BlockingIterator<Row, Row>> tuple = collectAndCheckUnderSameEnv( - true, enableLogStore, false, partitioned, hasPk, true, readHints, expected); + true, + enableLogStore, + false, + partitioned, + hasPk, + true, + readHints, + filter, + projection, + expected); tuple.f1.close(); return tuple.f0; } private Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose( - Map<String, String> readHints, List<Row> expected) throws Exception { + Map<String, String> readHints, + @Nullable String filter, + List<String> projection, + List<Row> expected) + throws Exception { return collectAndCheckUnderSameEnv( - true, true, false, true, true, true, readHints, expected); + true, true, false, true, true, true, readHints, filter, projection, expected); } private void collectLatestLogAndCheck( - boolean insertOnly, boolean partitioned, boolean hasPk, List<Row> expected) + boolean insertOnly, + boolean partitioned, + boolean hasPk, + @Nullable String filter, + List<String> projection, + List<Row> expected) throws Exception { Map<String, String> hints = new HashMap<>(); hints.put( LOG_PREFIX + LogOptions.SCAN.key(), LogOptions.LogStartupMode.LATEST.name().toLowerCase()); collectAndCheckUnderSameEnv( - true, true, insertOnly, partitioned, hasPk, false, hints, expected) + true, + true, + insertOnly, + partitioned, + hasPk, + false, + hints, + filter, + projection, + expected) .f1 .close(); } @@ -517,7 +1248,16 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp"); hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp)); collectAndCheckUnderSameEnv( - true, true, insertOnly, partitioned, hasPk, true, hints, expected) + true, + true, + insertOnly, + partitioned, + hasPk, + true, + hints, + null, + Collections.emptyList(), + expected) .f1 .close(); } @@ -530,6 +1270,8 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { boolean hasPk, boolean writeFirst, Map<String, String> readHints, + @Nullable String filter, + List<String> projection, List<Row> expected) throws Exception { Map<String, String> tableOptions = new HashMap<>(); @@ -565,7 +1307,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { tEnv.executeSql(managedTableDdl); String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable); - String selectQuery = prepareSimpleSelectQuery(managedTable, readHints); + String selectQuery = prepareSelectQuery(managedTable, readHints, filter, projection); BlockingIterator<Row, Row> iterator; if (writeFirst) { @@ -621,11 +1363,13 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { Map<String, String> hints, List<Row> expectedRecords) throws Exception { - String selectQuery = prepareSimpleSelectQuery(managedTable, hints); List<Row> actual = new ArrayList<>(); BlockingIterator<Row, Row> iterator = - collect(tEnv, selectQuery, expectedRecords.size(), actual); - + collect( + tEnv, + prepareSimpleSelectQuery(managedTable, hints), + expectedRecords.size(), + actual); assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecords); return iterator; } @@ -659,20 +1403,26 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { StringBuilder ddl = new StringBuilder("CREATE TABLE IF NOT EXISTS ") .append(String.format("`%s`", managedTableName)); + ddl.append(prepareOptions(tableOptions)) + .append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", sourceTableName)); + return ddl.toString(); + } + + private static String prepareOptions(Map<String, String> tableOptions) { + StringBuilder with = new StringBuilder(); if (tableOptions.size() > 0) { - ddl.append(" WITH (\n"); + with.append(" WITH (\n"); tableOptions.forEach( (k, v) -> - ddl.append(" ") + with.append(" ") .append(String.format("'%s'", k)) .append(" = ") .append(String.format("'%s',\n", v))); - int len = ddl.length(); - ddl.delete(len - 2, len); - ddl.append(")"); + int len = with.length(); + with.delete(len - 2, len); + with.append(")"); } - ddl.append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", sourceTableName)); - return ddl.toString(); + return with.toString(); } private static String prepareInsertIntoQuery(String sourceTableName, String managedTableName) { @@ -743,7 +1493,25 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { } private static String prepareSimpleSelectQuery(String tableName, Map<String, String> hints) { - return String.format("SELECT * FROM `%s` %s", tableName, buildHints(hints)); + return prepareSelectQuery(tableName, hints, null, Collections.emptyList()); + } + + private static String prepareSelectQuery( + String tableName, + Map<String, String> hints, + @Nullable String filter, + List<String> projections) { + StringBuilder queryBuilder = + new StringBuilder( + String.format( + "SELECT %s FROM `%s` %s", + projections.isEmpty() ? "*" : String.join(", ", projections), + tableName, + buildHints(hints))); + if (filter != null) { + queryBuilder.append("\nWHERE ").append(filter); + } + return queryBuilder.toString(); } private static String buildHints(Map<String, String> hints) { @@ -868,6 +1636,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+I", "Euro", 114L, "2022-01-01"), changelogRow("+I", "Yen", 1L, "2022-01-01"), changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "US Dollar", 114L, "2022-01-01"), // part = 2022-01-02 changelogRow("+I", "Euro", 119L, "2022-01-02")); } @@ -909,7 +1678,9 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { changelogRow("+I", "Euro", 119L), changelogRow("-U", "Euro", 119L), changelogRow("+U", "Euro", 119L), - changelogRow("-D", "Yen", 1L)); + changelogRow("-D", "Yen", 1L), + changelogRow("+I", null, 100L), + changelogRow("+I", "HK Dollar", null)); } private static List<Row> dailyRatesChangelogWithUB() { diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java index e17699f..7a51ce4 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java @@ -44,7 +44,7 @@ public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema private final TypeInformation<RowData> producedType; private final int fieldCount; private final int[] primaryKey; - @Nullable private final DeserializationSchema<RowData> keyDeserializer; + @Nullable private final DeserializationSchema<RowData> primaryKeyDeserializer; private final DeserializationSchema<RowData> valueDeserializer; private final RowData.FieldGetter[] keyFieldGetters; @Nullable private final int[][] projectFields; @@ -54,11 +54,11 @@ public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema public KafkaLogDeserializationSchema( DataType physicalType, int[] primaryKey, - @Nullable DeserializationSchema<RowData> keyDeserializer, + @Nullable DeserializationSchema<RowData> primaryKeyDeserializer, DeserializationSchema<RowData> valueDeserializer, @Nullable int[][] projectFields) { this.primaryKey = primaryKey; - this.keyDeserializer = keyDeserializer; + this.primaryKeyDeserializer = primaryKeyDeserializer; this.valueDeserializer = valueDeserializer; DataType projectedType = projectFields == null @@ -82,8 +82,8 @@ public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { - if (keyDeserializer != null) { - keyDeserializer.open(context); + if (primaryKeyDeserializer != null) { + primaryKeyDeserializer.open(context); } valueDeserializer.open(context); projectCollector = new ProjectCollector(); @@ -107,7 +107,7 @@ public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema Collector<RowData> collector = projectCollector.project(underCollector); if (primaryKey.length > 0 && record.value() == null) { - RowData key = keyDeserializer.deserialize(record.key()); + RowData key = primaryKeyDeserializer.deserialize(record.key()); GenericRowData value = new GenericRowData(RowKind.DELETE, fieldCount); for (int i = 0; i < primaryKey.length; i++) { value.setField(primaryKey[i], keyFieldGetters[i].getFieldOrNull(key)); diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java index a5e787b..9904934 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java @@ -167,11 +167,11 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory { FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); DataType physicalType = schema.toPhysicalRowDataType(); - DeserializationSchema<RowData> keyDeserializer = null; + DeserializationSchema<RowData> primaryKeyDeserializer = null; int[] primaryKey = schema.getPrimaryKeyIndexes(); if (primaryKey.length > 0) { DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey); - keyDeserializer = + primaryKeyDeserializer = LogStoreTableFactory.getKeyDecodingFormat(helper) .createRuntimeDecoder(sourceContext, keyType); } @@ -183,7 +183,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory { toKafkaProperties(helper.getOptions()), physicalType, primaryKey, - keyDeserializer, + primaryKeyDeserializer, valueDeserializer, projectFields, helper.getOptions().get(CONSISTENCY),
