This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 22100a7 [FLINK-26857] Add ITCase for projection and filter predicate
22100a7 is described below
commit 22100a7493da6c41953f9020e84df2c1e6a6e94d
Author: Jane Chan <[email protected]>
AuthorDate: Fri Apr 1 18:34:20 2022 +0800
[FLINK-26857] Add ITCase for projection and filter predicate
This closes #69
---
.../store/connector/source/TableStoreSource.java | 4 +-
.../store/connector/ReadWriteTableITCase.java | 835 ++++++++++++++++++++-
.../store/kafka/KafkaLogDeserializationSchema.java | 12 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 6 +-
4 files changed, 813 insertions(+), 44 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 8de30f6..e1681fe 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -44,7 +44,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -165,8 +164,7 @@ public class TableStoreSource
fieldFilters = filters;
}
return Result.of(
- new ArrayList<>(filters),
- fieldFilters == null ? Collections.emptyList() : fieldFilters);
+ filters, streaming && logStoreTableFactory != null ? filters :
fieldFilters);
}
@Override
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 3458529..6ea0367 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -35,6 +35,8 @@ import org.apache.flink.types.Row;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,13 +66,15 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
List<Row> expectedRecords =
Arrays.asList(
// part = 2022-01-01
- changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "US Dollar", 114L, "2022-01-01"),
changelogRow("+I", "Yen", 1L, "2022-01-01"),
changelogRow("+I", "Euro", 114L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 119L, "2022-01-02"));
// test batch read
- String managedTable = collectAndCheckBatchReadWrite(true, true,
expectedRecords);
+ String managedTable =
+ collectAndCheckBatchReadWrite(
+ true, true, null, Collections.emptyList(),
expectedRecords);
checkFileStorePath(tEnv, managedTable);
// test streaming read
@@ -97,13 +101,80 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
Collections.emptyMap(),
Arrays.asList(
// part = 2022-01-01
- changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
+ changelogRow("+I", "US Dollar", 114L,
"2022-01-01"),
changelogRow("+I", "Yen", 1L, "2022-01-01"),
changelogRow("+I", "Euro", 114L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 100L, "2022-01-02"),
changelogRow("+I", "Yen", 1L, "2022-01-02")))
.close();
+
+ // test partition filter
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "US Dollar", 114L, "2022-01-01"));
+ collectAndCheckBatchReadWrite(
+ true, true, "dt <> '2022-01-02'", Collections.emptyList(),
expected);
+
+ collectAndCheckBatchReadWrite(
+ true, true, "dt = '2022-01-01'", Collections.emptyList(),
expected);
+
+ // test field filter
+ collectAndCheckBatchReadWrite(
+ true,
+ true,
+ "rate >= 100",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 114L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02")));
+
+ // test partition and field filter
+ collectAndCheckBatchReadWrite(
+ true,
+ true,
+ "rate >= 100 AND dt = '2022-01-02'",
+ Collections.emptyList(),
+ Collections.singletonList(changelogRow("+I", "Euro", 119L,
"2022-01-02")));
+
+ // test projection
+ collectAndCheckBatchReadWrite(
+ true,
+ true,
+ null,
+ Collections.singletonList("dt"),
+ Arrays.asList(
+ changelogRow("+I", "2022-01-01"), // US Dollar
+ changelogRow("+I", "2022-01-01"), // Yen
+ changelogRow("+I", "2022-01-01"), // Euro
+ changelogRow("+I", "2022-01-02"))); // Euro
+
+ collectAndCheckBatchReadWrite(
+ true,
+ true,
+ null,
+ Collections.singletonList("dt, currency, rate"),
+ Arrays.asList(
+ changelogRow("+I", "2022-01-01", "US Dollar", 114L),
// US Dollar
+ changelogRow("+I", "2022-01-01", "Yen", 1L), // Yen
+ changelogRow("+I", "2022-01-01", "Euro", 114L), // Euro
+ changelogRow("+I", "2022-01-02", "Euro", 119L))); //
Euro
+
+ // test projection and filter
+ collectAndCheckBatchReadWrite(
+ true,
+ true,
+ "rate = 114",
+ Collections.singletonList("rate"),
+ Arrays.asList(
+ changelogRow("+I", 114L), // US Dollar
+ changelogRow("+I", 114L) // Euro
+ ));
}
@Test
@@ -111,7 +182,9 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
// input is dailyRates()
// test batch read
- String managedTable = collectAndCheckBatchReadWrite(true, false,
dailyRates());
+ String managedTable =
+ collectAndCheckBatchReadWrite(
+ true, false, null, Collections.emptyList(),
dailyRates());
checkFileStorePath(tEnv, managedTable);
// overwrite dynamic partition
@@ -136,6 +209,52 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
// part = 2022-01-02
changelogRow("+I", "Yen", 2L, "2022-01-02")))
.close();
+
+ // test partition filter
+ collectAndCheckBatchReadWrite(
+ true, false, "dt >= '2022-01-01'", Collections.emptyList(),
dailyRates());
+
+ // test field filter
+ collectAndCheckBatchReadWrite(
+ true,
+ false,
+ "currency = 'US Dollar'",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 114L, "2022-01-01")));
+
+ // test partition and field filter
+ collectAndCheckBatchReadWrite(
+ true,
+ false,
+ "dt = '2022-01-01' OR rate > 115",
+ Collections.emptyList(),
+ dailyRates());
+
+ // test projection
+ collectAndCheckBatchReadWrite(
+ true,
+ false,
+ null,
+ Collections.singletonList("currency"),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar"),
+ changelogRow("+I", "US Dollar"),
+ changelogRow("+I", "Yen"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Euro")));
+
+ // test projection and filter
+ collectAndCheckBatchReadWrite(
+ true,
+ false,
+ "rate = 119",
+ Arrays.asList("currency", "dt"),
+ Collections.singletonList(changelogRow("+I", "Euro",
"2022-01-02")));
}
@Test
@@ -145,6 +264,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
collectAndCheckBatchReadWrite(
false,
true,
+ null,
+ Collections.emptyList(),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Yen", 1L),
@@ -161,13 +282,87 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
managedTable,
Collections.emptyMap(),
Collections.singletonList(changelogRow("+I", "Euro", 100L)));
+
+ // test field filter
+ collectAndCheckBatchReadWrite(
+ false,
+ true,
+ "currency = 'Euro'",
+ Collections.emptyList(),
+ Collections.singletonList(changelogRow("+I", "Euro", 119L)));
+
+ collectAndCheckBatchReadWrite(
+ false,
+ true,
+ "119 >= rate AND 102 < rate",
+ Collections.emptyList(),
+ Collections.singletonList(changelogRow("+I", "Euro", 119L)));
+
+ // test projection
+ collectAndCheckBatchReadWrite(
+ false,
+ true,
+ null,
+ Arrays.asList("rate", "currency"),
+ Arrays.asList(
+ changelogRow("+I", 102L, "US Dollar"),
+ changelogRow("+I", 1L, "Yen"),
+ changelogRow("+I", 119L, "Euro")));
+
+ // test projection and filter
+ collectAndCheckBatchReadWrite(
+ false,
+ true,
+ "currency = 'Yen'",
+ Collections.singletonList("rate"),
+ Collections.singletonList(changelogRow("+I", 1L)));
}
@Test
public void testBatchWriteNonPartitionedRecordsWithoutPk() throws
Exception {
// input is rates()
- String managedTable = collectAndCheckBatchReadWrite(false, false,
rates());
+ String managedTable =
+ collectAndCheckBatchReadWrite(false, false, null,
Collections.emptyList(), rates());
checkFileStorePath(tEnv, managedTable);
+
+ // test field filter
+ collectAndCheckBatchReadWrite(
+ false,
+ false,
+ "currency = 'Euro'",
+ Collections.emptyList(),
+ Arrays.asList(
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Euro", 119L)));
+
+ collectAndCheckBatchReadWrite(false, false, "rate >= 1",
Collections.emptyList(), rates());
+
+ // test projection
+ collectAndCheckBatchReadWrite(
+ false,
+ false,
+ null,
+ Collections.singletonList("currency"),
+ Arrays.asList(
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Yen"),
+ changelogRow("+I", "US Dollar")));
+
+ // test projection and filter
+ collectAndCheckBatchReadWrite(
+ false,
+ false,
+ "rate > 100 OR currency = 'Yen'",
+ Collections.singletonList("currency"),
+ Arrays.asList(
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Yen"),
+ changelogRow("+I", "US Dollar")));
}
@Test
@@ -177,6 +372,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
Tuple2<String, BlockingIterator<Row, Row>> tuple =
collectAndCheckStreamingReadWriteWithoutClose(
Collections.emptyMap(),
+ "dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency
= 'HK Dollar'",
+ Collections.emptyList(),
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
@@ -186,6 +383,27 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
checkFileStorePath(tEnv, managedTable);
BlockingIterator<Row, Row> streamIter = tuple.f1;
+ // test log store in hybrid mode accepts all filters
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO `%s` PARTITION (dt =
'2022-01-03')\n"
+ + "VALUES('HK Dollar', 100), ('Yen',
20)\n",
+ managedTable))
+ .await();
+
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO `%s` PARTITION (dt =
'2022-01-04')\n"
+ + "VALUES('Yen', 20)\n",
+ managedTable))
+ .await();
+
+ assertThat(streamIter.collect(2, 10, TimeUnit.SECONDS))
+ .containsExactlyInAnyOrderElementsOf(
+ Arrays.asList(
+ changelogRow("+I", "HK Dollar", 100L,
"2022-01-03"),
+ changelogRow("+I", "Yen", 20L, "2022-01-03")));
+
// overwrite partition 2022-01-02
prepareEnvAndOverwrite(
managedTable,
@@ -205,10 +423,55 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 100L, "2022-01-02"),
- changelogRow("+I", "Yen", 1L, "2022-01-02")));
+ changelogRow("+I", "Yen", 1L, "2022-01-02"),
+ // part = 2022-01-03
+ changelogRow("+I", "HK Dollar", 100L, "2022-01-03"),
+ changelogRow("+I", "Yen", 20L, "2022-01-03"),
+ // part = 2022-01-04
+ changelogRow("+I", "Yen", 20L, "2022-01-04")));
// check no changelog generated for streaming read
assertNoMoreRecords(streamIter);
+
+ // filter on partition
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ true,
+ Collections.emptyMap(),
+ "dt = '2022-01-01'",
+ Collections.emptyList(),
+ Collections.singletonList(changelogRow("+I", "US Dollar",
102L, "2022-01-01")));
+
+ // test field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ true,
+ Collections.emptyMap(),
+ "currency = 'US Dollar'",
+ Collections.emptyList(),
+ Collections.singletonList(changelogRow("+I", "US Dollar",
102L, "2022-01-01")));
+
+ // test partition and field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ true,
+ Collections.emptyMap(),
+ "dt = '2022-01-01' AND rate = 1",
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // test projection and filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ true,
+ Collections.emptyMap(),
+ "dt = '2022-01-02' AND currency = 'Euro'",
+ Arrays.asList("rate", "dt", "currency"),
+ Collections.singletonList(changelogRow("+I", 119L,
"2022-01-02", "Euro")));
}
@Test
@@ -223,11 +486,61 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
true,
true,
Collections.emptyMap(),
+ null,
+ Collections.emptyList(),
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 119L,
"2022-01-02"))));
+
+ // test partition filter
+ collectAndCheckStreamingReadWriteWithClose(
+ false,
+ true,
+ true,
+ Collections.emptyMap(),
+ "dt < '2022-01-02'",
+ Collections.emptyList(),
+ Collections.singletonList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+
+ // test field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ false,
+ true,
+ true,
+ Collections.emptyMap(),
+ "rate = 102",
+ Collections.emptyList(),
+ Collections.singletonList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+
+ // test partition and field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ false,
+ true,
+ true,
+ Collections.emptyMap(),
+ "rate = 102 or dt < '2022-01-02'",
+ Collections.emptyList(),
+ Collections.singletonList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01")));
+
+ // test projection and filter
+ collectAndCheckStreamingReadWriteWithClose(
+ false,
+ true,
+ true,
+ Collections.emptyMap(),
+ "rate = 102 or dt < '2022-01-02'",
+ Collections.singletonList("currency"),
+ Collections.singletonList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar")));
}
@Test
@@ -241,11 +554,86 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
true,
false,
Collections.emptyMap(),
+ null,
+ Collections.emptyList(),
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 115L,
"2022-01-02"))));
+
+ // test partition filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ "dt IS NOT NULL",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 115L, "2022-01-02")));
+
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ "dt IS NULL",
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // test field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ "currency = 'US Dollar' OR rate = 115",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 115L, "2022-01-02")));
+
+ // test partition and field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ "(dt = '2022-01-02' AND currency = 'US Dollar') OR (dt =
'2022-01-01' AND rate = 115)",
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // test projection
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ null,
+ Collections.singletonList("rate"),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", 102L),
+ // part = 2022-01-02
+ changelogRow("+I", 115L)));
+
+ // test projection and filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ "dt <> '2022-01-01'",
+ Collections.singletonList("rate"),
+ Collections.singletonList(
+ // part = 2022-01-02, Euro
+ changelogRow("+I", 115L)));
}
@Test
@@ -259,9 +647,41 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
false,
true,
Collections.emptyMap(),
+ null,
+ Collections.emptyList(),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 119L))));
+
+ // test field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ true,
+ Collections.emptyMap(),
+ "currency = 'Yen'",
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // test projection
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ true,
+ Collections.emptyMap(),
+ null,
+ Collections.singletonList("currency"),
+ Arrays.asList(changelogRow("+I", "US Dollar"),
changelogRow("+I", "Euro")));
+
+ // test projection and filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ true,
+ Collections.emptyMap(),
+ "rate = 102",
+ Collections.singletonList("currency"),
+ Collections.singletonList(changelogRow("+I", "US Dollar")));
}
@Test
@@ -275,9 +695,51 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
false,
false,
Collections.emptyMap(),
+ null,
+ Collections.emptyList(),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
- changelogRow("+I", "Euro", 119L))));
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("+I", null, 100L),
+ changelogRow("+I", "HK Dollar", null))));
+
+ // test field filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ false,
+ Collections.emptyMap(),
+ "currency IS NOT NULL",
+ Collections.emptyList(),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("+I", "HK Dollar", null)));
+
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ false,
+ Collections.emptyMap(),
+ "rate IS NOT NULL",
+ Collections.emptyList(),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("+I", null, 100L)));
+
+ // test projection and filter
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ false,
+ Collections.emptyMap(),
+ "currency IS NOT NULL AND rate is NOT NULL",
+ Collections.singletonList("rate"),
+ Arrays.asList(
+ changelogRow("+I", 102L), // US Dollar
+ changelogRow("+I", 119L)) // Euro
+ );
}
@Test
@@ -287,6 +749,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
false,
true,
true,
+ null,
+ Collections.emptyList(),
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
@@ -298,6 +762,86 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("-D", "Euro", 116L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 119L, "2022-01-02")));
+
+ // test partition filter
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
+ "dt = '2022-01-01'",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-U", "Euro", 114L, "2022-01-01"),
+ changelogRow("+U", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01")));
+
+ // test field filter
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
+ "currency = 'Yen'",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01")));
+
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
+ "rate = 114",
+ Collections.emptyList(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("-U", "Euro", 114L, "2022-01-01")));
+
+ // test partition and field filter
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
+ "rate = 114 AND dt = '2022-01-02'",
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // test projection
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
+ null,
+ Collections.singletonList("rate"),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", 102L), // US Dollar
+ changelogRow("+I", 114L), // Euro
+ changelogRow("+I", 1L), // Yen
+ changelogRow("-U", 114L), // Euro
+ changelogRow("+U", 116L), // Euro
+ changelogRow("-D", 1L), // Yen
+ changelogRow("-D", 116L), // Euro
+ // part = 2022-01-02
+ changelogRow("+I", 119L) // Euro
+ ));
+
+ // test projection and filter
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
+ "dt = '2022-01-02'",
+ Collections.singletonList("rate"),
+ Collections.singletonList(
+ // part = 2022-01-02, Euro
+ changelogRow("+I", 119L)));
}
@Test
@@ -307,6 +851,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
false,
true,
false,
+ null,
+ Collections.emptyList(),
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
@@ -329,6 +875,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
false,
false,
true,
+ null,
+ Collections.emptyList(),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 114L),
@@ -338,6 +886,49 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("-D", "Euro", 116L),
changelogRow("+I", "Euro", 119L),
changelogRow("-D", "Yen", 1L)));
+
+ // test field filter
+ collectLatestLogAndCheck(
+ false,
+ false,
+ true,
+ "currency = 'Euro'",
+ Collections.emptyList(),
+ Arrays.asList(
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("-U", "Euro", 114L),
+ changelogRow("+U", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L)));
+
+ // test projection
+ collectLatestLogAndCheck(
+ false,
+ false,
+ true,
+ null,
+ Collections.singletonList("currency"),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Yen"),
+ changelogRow("-D", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("-D", "Yen")));
+
+ // test projection and filter
+ collectLatestLogAndCheck(
+ false,
+ false,
+ true,
+ "currency = 'Euro'",
+ Collections.singletonList("rate"),
+ Arrays.asList(
+ changelogRow("+I", 114L),
+ changelogRow("-U", 114L),
+ changelogRow("+U", 116L),
+ changelogRow("-D", 116L),
+ changelogRow("+I", 119L)));
}
@Test
@@ -347,6 +938,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
false,
false,
false,
+ null,
+ Collections.emptyList(),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 114L),
@@ -357,7 +950,77 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+I", "Euro", 119L),
changelogRow("-D", "Euro", 119L),
changelogRow("+I", "Euro", 119L),
- changelogRow("-D", "Yen", 1L)));
+ changelogRow("-D", "Yen", 1L),
+ changelogRow("+I", null, 100L),
+ changelogRow("+I", "HK Dollar", null)));
+
+ // test field filter
+ collectLatestLogAndCheck(
+ false,
+ false,
+ false,
+ "currency = 'Euro'",
+ Collections.emptyList(),
+ Arrays.asList(
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("-D", "Euro", 114L),
+ changelogRow("+I", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-D", "Euro", 119L),
+ changelogRow("+I", "Euro", 119L)));
+
+ // test projection
+ collectLatestLogAndCheck(
+ false,
+ false,
+ false,
+ null,
+ Arrays.asList("currency", "rate"),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("-D", "Euro", 114L),
+ changelogRow("+I", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-D", "Euro", 119L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-D", "Yen", 1L),
+ changelogRow("+I", null, 100L),
+ changelogRow("+I", "HK Dollar", null)));
+
+ // test projection and filter
+ collectLatestLogAndCheck(
+ false,
+ false,
+ false,
+ "currency IS NOT NULL",
+ Collections.singletonList("currency"),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar"),
+ changelogRow("+I", "Euro"),
+ changelogRow("+I", "Yen"),
+ changelogRow("-D", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("-D", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("-D", "Euro"),
+ changelogRow("+I", "Euro"),
+ changelogRow("-D", "Yen"),
+ changelogRow("+I", "HK Dollar")));
+
+ collectLatestLogAndCheck(
+ false,
+ false,
+ false,
+ "rate = 119",
+ Collections.singletonList("currency"),
+ Arrays.asList(
+ changelogRow("+I", "Euro"),
+ changelogRow("-D", "Euro"),
+ changelogRow("+I", "Euro")));
}
@Test
@@ -372,10 +1035,42 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+U", "Euro", 119L));
// currency as pk
- collectLatestLogAndCheck(true, false, true, expected);
+ collectLatestLogAndCheck(true, false, true, null,
Collections.emptyList(), expected);
// without pk
- collectLatestLogAndCheck(true, false, true, expected);
+ collectLatestLogAndCheck(true, false, true, null,
Collections.emptyList(), expected);
+
+ // test field filter
+ collectLatestLogAndCheck(
+ true,
+ false,
+ true,
+ "rate = 114",
+ Collections.emptyList(),
+ Arrays.asList(changelogRow("+I", "Euro", 114L),
changelogRow("-U", "Euro", 114L)));
+
+ // test projection
+ collectLatestLogAndCheck(
+ true,
+ false,
+ true,
+ null,
+ Collections.singletonList("rate"),
+ Arrays.asList(
+ changelogRow("+I", 102L),
+ changelogRow("+I", 114L),
+ changelogRow("+I", 1L),
+ changelogRow("-U", 114L),
+ changelogRow("+U", 119L)));
+
+ // test projection and filter
+ collectLatestLogAndCheck(
+ true,
+ false,
+ true,
+ "rate = 114",
+ Collections.singletonList("currency"),
+ Arrays.asList(changelogRow("+I", "Euro"), changelogRow("-U",
"Euro")));
}
@Test
@@ -391,6 +1086,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
changelogRow("+I", "Yen", 1L, "2022-01-01"),
changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("-U", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+U", "US Dollar", 114L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 119L, "2022-01-02")));
@@ -456,10 +1153,13 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+I", "Euro", 119L, "2022-01-02")));
}
- // ------------------------ Tools ----------------------------------
-
private String collectAndCheckBatchReadWrite(
- boolean partitioned, boolean hasPk, List<Row> expected) throws
Exception {
+ boolean partitioned,
+ boolean hasPk,
+ @Nullable String filter,
+ List<String> projection,
+ List<Row> expected)
+ throws Exception {
return collectAndCheckUnderSameEnv(
false,
false,
@@ -468,6 +1168,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
hasPk,
true,
Collections.emptyMap(),
+ filter,
+ projection,
expected)
.f0;
}
@@ -477,31 +1179,60 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
boolean partitioned,
boolean hasPk,
Map<String, String> readHints,
+ @Nullable String filter,
+ List<String> projection,
List<Row> expected)
throws Exception {
Tuple2<String, BlockingIterator<Row, Row>> tuple =
collectAndCheckUnderSameEnv(
- true, enableLogStore, false, partitioned, hasPk, true,
readHints, expected);
+ true,
+ enableLogStore,
+ false,
+ partitioned,
+ hasPk,
+ true,
+ readHints,
+ filter,
+ projection,
+ expected);
tuple.f1.close();
return tuple.f0;
}
private Tuple2<String, BlockingIterator<Row, Row>>
collectAndCheckStreamingReadWriteWithoutClose(
- Map<String, String> readHints, List<Row> expected) throws
Exception {
+ Map<String, String> readHints,
+ @Nullable String filter,
+ List<String> projection,
+ List<Row> expected)
+ throws Exception {
return collectAndCheckUnderSameEnv(
- true, true, false, true, true, true, readHints, expected);
+ true, true, false, true, true, true, readHints, filter,
projection, expected);
}
private void collectLatestLogAndCheck(
- boolean insertOnly, boolean partitioned, boolean hasPk, List<Row>
expected)
+ boolean insertOnly,
+ boolean partitioned,
+ boolean hasPk,
+ @Nullable String filter,
+ List<String> projection,
+ List<Row> expected)
throws Exception {
Map<String, String> hints = new HashMap<>();
hints.put(
LOG_PREFIX + LogOptions.SCAN.key(),
LogOptions.LogStartupMode.LATEST.name().toLowerCase());
collectAndCheckUnderSameEnv(
- true, true, insertOnly, partitioned, hasPk, false,
hints, expected)
+ true,
+ true,
+ insertOnly,
+ partitioned,
+ hasPk,
+ false,
+ hints,
+ filter,
+ projection,
+ expected)
.f1
.close();
}
@@ -517,7 +1248,16 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp");
hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(),
String.valueOf(timestamp));
collectAndCheckUnderSameEnv(
- true, true, insertOnly, partitioned, hasPk, true,
hints, expected)
+ true,
+ true,
+ insertOnly,
+ partitioned,
+ hasPk,
+ true,
+ hints,
+ null,
+ Collections.emptyList(),
+ expected)
.f1
.close();
}
@@ -530,6 +1270,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
boolean hasPk,
boolean writeFirst,
Map<String, String> readHints,
+ @Nullable String filter,
+ List<String> projection,
List<Row> expected)
throws Exception {
Map<String, String> tableOptions = new HashMap<>();
@@ -565,7 +1307,7 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
tEnv.executeSql(managedTableDdl);
String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
- String selectQuery = prepareSimpleSelectQuery(managedTable, readHints);
+ String selectQuery = prepareSelectQuery(managedTable, readHints,
filter, projection);
BlockingIterator<Row, Row> iterator;
if (writeFirst) {
@@ -621,11 +1363,13 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
Map<String, String> hints,
List<Row> expectedRecords)
throws Exception {
- String selectQuery = prepareSimpleSelectQuery(managedTable, hints);
List<Row> actual = new ArrayList<>();
BlockingIterator<Row, Row> iterator =
- collect(tEnv, selectQuery, expectedRecords.size(), actual);
-
+ collect(
+ tEnv,
+ prepareSimpleSelectQuery(managedTable, hints),
+ expectedRecords.size(),
+ actual);
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecords);
return iterator;
}
@@ -659,20 +1403,26 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
StringBuilder ddl =
new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append(String.format("`%s`", managedTableName));
+ ddl.append(prepareOptions(tableOptions))
+ .append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n",
sourceTableName));
+ return ddl.toString();
+ }
+
+ private static String prepareOptions(Map<String, String> tableOptions) {
+ StringBuilder with = new StringBuilder();
if (tableOptions.size() > 0) {
- ddl.append(" WITH (\n");
+ with.append(" WITH (\n");
tableOptions.forEach(
(k, v) ->
- ddl.append(" ")
+ with.append(" ")
.append(String.format("'%s'", k))
.append(" = ")
.append(String.format("'%s',\n", v)));
- int len = ddl.length();
- ddl.delete(len - 2, len);
- ddl.append(")");
+ int len = with.length();
+ with.delete(len - 2, len);
+ with.append(")");
}
- ddl.append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n",
sourceTableName));
- return ddl.toString();
+ return with.toString();
}
private static String prepareInsertIntoQuery(String sourceTableName,
String managedTableName) {
@@ -743,7 +1493,25 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
}
private static String prepareSimpleSelectQuery(String tableName,
Map<String, String> hints) {
- return String.format("SELECT * FROM `%s` %s", tableName,
buildHints(hints));
+ return prepareSelectQuery(tableName, hints, null,
Collections.emptyList());
+ }
+
+ private static String prepareSelectQuery(
+ String tableName,
+ Map<String, String> hints,
+ @Nullable String filter,
+ List<String> projections) {
+ StringBuilder queryBuilder =
+ new StringBuilder(
+ String.format(
+ "SELECT %s FROM `%s` %s",
+ projections.isEmpty() ? "*" : String.join(",
", projections),
+ tableName,
+ buildHints(hints)));
+ if (filter != null) {
+ queryBuilder.append("\nWHERE ").append(filter);
+ }
+ return queryBuilder.toString();
}
private static String buildHints(Map<String, String> hints) {
@@ -868,6 +1636,7 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+I", "Euro", 114L, "2022-01-01"),
changelogRow("+I", "Yen", 1L, "2022-01-01"),
changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "US Dollar", 114L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", "Euro", 119L, "2022-01-02"));
}
@@ -909,7 +1678,9 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
changelogRow("+I", "Euro", 119L),
changelogRow("-U", "Euro", 119L),
changelogRow("+U", "Euro", 119L),
- changelogRow("-D", "Yen", 1L));
+ changelogRow("-D", "Yen", 1L),
+ changelogRow("+I", null, 100L),
+ changelogRow("+I", "HK Dollar", null));
}
private static List<Row> dailyRatesChangelogWithUB() {
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
index e17699f..7a51ce4 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
@@ -44,7 +44,7 @@ public class KafkaLogDeserializationSchema implements
KafkaDeserializationSchema
private final TypeInformation<RowData> producedType;
private final int fieldCount;
private final int[] primaryKey;
- @Nullable private final DeserializationSchema<RowData> keyDeserializer;
+ @Nullable private final DeserializationSchema<RowData>
primaryKeyDeserializer;
private final DeserializationSchema<RowData> valueDeserializer;
private final RowData.FieldGetter[] keyFieldGetters;
@Nullable private final int[][] projectFields;
@@ -54,11 +54,11 @@ public class KafkaLogDeserializationSchema implements
KafkaDeserializationSchema
public KafkaLogDeserializationSchema(
DataType physicalType,
int[] primaryKey,
- @Nullable DeserializationSchema<RowData> keyDeserializer,
+ @Nullable DeserializationSchema<RowData> primaryKeyDeserializer,
DeserializationSchema<RowData> valueDeserializer,
@Nullable int[][] projectFields) {
this.primaryKey = primaryKey;
- this.keyDeserializer = keyDeserializer;
+ this.primaryKeyDeserializer = primaryKeyDeserializer;
this.valueDeserializer = valueDeserializer;
DataType projectedType =
projectFields == null
@@ -82,8 +82,8 @@ public class KafkaLogDeserializationSchema implements
KafkaDeserializationSchema
@Override
public void open(DeserializationSchema.InitializationContext context)
throws Exception {
- if (keyDeserializer != null) {
- keyDeserializer.open(context);
+ if (primaryKeyDeserializer != null) {
+ primaryKeyDeserializer.open(context);
}
valueDeserializer.open(context);
projectCollector = new ProjectCollector();
@@ -107,7 +107,7 @@ public class KafkaLogDeserializationSchema implements
KafkaDeserializationSchema
Collector<RowData> collector =
projectCollector.project(underCollector);
if (primaryKey.length > 0 && record.value() == null) {
- RowData key = keyDeserializer.deserialize(record.key());
+ RowData key = primaryKeyDeserializer.deserialize(record.key());
GenericRowData value = new GenericRowData(RowKind.DELETE,
fieldCount);
for (int i = 0; i < primaryKey.length; i++) {
value.setField(primaryKey[i],
keyFieldGetters[i].getFieldOrNull(key));
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index a5e787b..9904934 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -167,11 +167,11 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this,
context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
- DeserializationSchema<RowData> keyDeserializer = null;
+ DeserializationSchema<RowData> primaryKeyDeserializer = null;
int[] primaryKey = schema.getPrimaryKeyIndexes();
if (primaryKey.length > 0) {
DataType keyType = DataTypeUtils.projectRow(physicalType,
primaryKey);
- keyDeserializer =
+ primaryKeyDeserializer =
LogStoreTableFactory.getKeyDecodingFormat(helper)
.createRuntimeDecoder(sourceContext, keyType);
}
@@ -183,7 +183,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
toKafkaProperties(helper.getOptions()),
physicalType,
primaryKey,
- keyDeserializer,
+ primaryKeyDeserializer,
valueDeserializer,
projectFields,
helper.getOptions().get(CONSISTENCY),