This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d1a8b6f [FLINK-26879] Fix that LogStartupMode.LATEST and
LogStartupMode.FROM_TIMESTAMP don't work
d1a8b6f is described below
commit d1a8b6fe2113654f3ba107969b3373ae459b4c5d
Author: Jane Chan <[email protected]>
AuthorDate: Wed Mar 30 14:43:42 2022 +0800
[FLINK-26879] Fix that LogStartupMode.LATEST and
LogStartupMode.FROM_TIMESTAMP don't work
This closes #65
---
flink-table-store-connector/pom.xml | 8 +-
.../flink/table/store/connector/TableStore.java | 37 +-
.../store/connector/source/TableStoreSource.java | 1 -
.../store/connector/ReadWriteTableITCase.java | 427 +++++++++++++++++----
flink-table-store-kafka/pom.xml | 2 +-
.../table/store/kafka/KafkaLogSourceProvider.java | 10 +-
6 files changed, 380 insertions(+), 105 deletions(-)
diff --git a/flink-table-store-connector/pom.xml
b/flink-table-store-connector/pom.xml
index 536befb..dfde4ed 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -212,7 +212,7 @@ under the License.
<!-- include 2.0 server for tests -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>2.4.1</version>
+ <version>2.8.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -239,6 +239,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>2.8.1</version>
+ </dependency>
+
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 9374933..d222018 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -227,8 +227,6 @@ public class TableStore {
private boolean isContinuous = false;
- private boolean isHybrid = true;
-
@Nullable private int[][] projectedFields;
@Nullable private Predicate partitionPredicate;
@@ -257,11 +255,6 @@ public class TableStore {
return this;
}
- public SourceBuilder withHybridMode(boolean isHybrid) {
- this.isHybrid = isHybrid;
- return this;
- }
-
public SourceBuilder withLogSourceProvider(LogSourceProvider
logSourceProvider) {
this.logSourceProvider = logSourceProvider;
return this;
@@ -271,20 +264,14 @@ public class TableStore {
return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
}
- private FileStoreSource buildFileSource(boolean isContinuous) {
- FileStore fileStore = buildFileStore();
-
- boolean latestContinuous = false;
- if (isContinuous) {
- LogStartupMode startupMode = logOptions().get(SCAN);
- latestContinuous = startupMode == LogStartupMode.LATEST;
- }
+ private FileStoreSource buildFileSource(
+ boolean isContinuous, boolean continuousScanLatest) {
return new FileStoreSource(
- fileStore,
+ buildFileStore(),
primaryKeys.length == 0,
isContinuous,
discoveryIntervalMills(),
- latestContinuous,
+ continuousScanLatest,
projectedFields,
partitionPredicate,
fieldPredicate);
@@ -292,22 +279,22 @@ public class TableStore {
public Source<RowData, ?, ?> build() {
if (isContinuous) {
+ LogStartupMode startupMode = logOptions().get(SCAN);
if (logSourceProvider == null) {
- return buildFileSource(true);
- }
-
- if (isHybrid) {
+ return buildFileSource(true, startupMode ==
LogStartupMode.LATEST);
+ } else {
+ if (startupMode != LogStartupMode.FULL) {
+ return logSourceProvider.createSource(null);
+ }
return HybridSource.<RowData,
StaticFileStoreSplitEnumerator>builder(
- buildFileSource(false))
+ buildFileSource(false, false))
.addSource(
new
LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
.build();
- } else {
- return logSourceProvider.createSource(null);
}
} else {
- return buildFileSource(false);
+ return buildFileSource(false, false);
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 11da93e..8de30f6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -135,7 +135,6 @@ public class TableStoreSource
tableStore
.sourceBuilder()
.withContinuousMode(streaming)
- .withHybridMode(streaming && logSourceProvider != null)
.withLogSourceProvider(logSourceProvider)
.withProjection(projectFields)
.withPartitionPredicate(PredicateConverter.convert(partitionFilters))
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 86ec5dc..3458529 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
@@ -29,6 +30,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.types.Row;
import org.junit.Test;
@@ -58,8 +60,6 @@ public class ReadWriteTableITCase extends KafkaTableTestBase {
@Test
public void testBatchWriteWithPartitionedRecordsWithPk() throws Exception {
- String managedTable = prepareEnvAndWrite(false, false, true, true);
-
// input is dailyRates()
List<Row> expectedRecords =
Arrays.asList(
@@ -70,7 +70,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase {
// part = 2022-01-02
changelogRow("+I", "Euro", 119L, "2022-01-02"));
// test batch read
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ String managedTable = collectAndCheckBatchReadWrite(true, true,
expectedRecords);
checkFileStorePath(tEnv, managedTable);
// test streaming read
@@ -91,22 +91,27 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
assertNoMoreRecords(streamIter);
// batch read to check partition refresh
- expectedRecords = new ArrayList<>(expectedRecords);
- expectedRecords.remove(3);
- expectedRecords.add(changelogRow("+I", "Euro", 100L, "2022-01-02"));
- expectedRecords.add(changelogRow("+I", "Yen", 1L, "2022-01-02"));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ collectAndCheck(
+ tEnv,
+ managedTable,
+ Collections.emptyMap(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 100L, "2022-01-02"),
+ changelogRow("+I", "Yen", 1L, "2022-01-02")))
+ .close();
}
@Test
public void testBatchWriteWithPartitionedRecordsWithoutPk() throws
Exception {
- String managedTable = prepareEnvAndWrite(false, false, true, false);
-
// input is dailyRates()
- List<Row> expectedRecords = dailyRates();
// test batch read
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ String managedTable = collectAndCheckBatchReadWrite(true, false,
dailyRates());
checkFileStorePath(tEnv, managedTable);
// overwrite dynamic partition
@@ -135,15 +140,15 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
@Test
public void testBatchWriteWithNonPartitionedRecordsWithPk() throws
Exception {
- String managedTable = prepareEnvAndWrite(false, false, false, true);
-
// input is rates()
- List<Row> expectedRecords =
- Arrays.asList(
- changelogRow("+I", "US Dollar", 102L),
- changelogRow("+I", "Yen", 1L),
- changelogRow("+I", "Euro", 119L));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ String managedTable =
+ collectAndCheckBatchReadWrite(
+ false,
+ true,
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("+I", "Euro", 119L)));
checkFileStorePath(tEnv, managedTable);
// overwrite the whole table
@@ -151,37 +156,35 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
managedTable,
Collections.emptyMap(),
Collections.singletonList(new String[] {"'Euro'", "100"}));
- expectedRecords = new ArrayList<>(expectedRecords);
- expectedRecords.clear();
- expectedRecords.add(changelogRow("+I", "Euro", 100L));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ collectAndCheck(
+ tEnv,
+ managedTable,
+ Collections.emptyMap(),
+ Collections.singletonList(changelogRow("+I", "Euro", 100L)));
}
@Test
public void testBatchWriteNonPartitionedRecordsWithoutPk() throws
Exception {
- String managedTable = prepareEnvAndWrite(false, false, false, false);
-
// input is rates()
- List<Row> expectedRecords = rates();
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ String managedTable = collectAndCheckBatchReadWrite(false, false,
rates());
checkFileStorePath(tEnv, managedTable);
}
@Test
public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk()
throws Exception {
- String managedTable = prepareEnvAndWrite(true, true, true, true);
-
// input is dailyRatesChangelogWithoutUB()
// test hybrid read
- List<Row> expectedRecords =
- Arrays.asList(
- // part = 2022-01-01
- changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
- // part = 2022-01-02
- changelogRow("+I", "Euro", 119L, "2022-01-02"));
- BlockingIterator<Row, Row> streamIter =
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords);
+ Tuple2<String, BlockingIterator<Row, Row>> tuple =
+ collectAndCheckStreamingReadWriteWithoutClose(
+ Collections.emptyMap(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L,
"2022-01-02")));
+ String managedTable = tuple.f0;
checkFileStorePath(tEnv, managedTable);
+ BlockingIterator<Row, Row> streamIter = tuple.f1;
// overwrite partition 2022-01-02
prepareEnvAndOverwrite(
@@ -210,64 +213,324 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
@Test
public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk()
throws Exception {
- String managedTable = prepareEnvAndWrite(true, false, true, true);
-
// input is dailyRatesChangelogWithoutUB()
// file store continuous read
// will not merge, at least collect two records
- List<Row> expectedRecords =
+ checkFileStorePath(
+ tEnv,
+ collectAndCheckStreamingReadWriteWithClose(
+ false,
+ true,
+ true,
+ Collections.emptyMap(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L,
"2022-01-02"))));
+ }
+
+ @Test
+ public void testStreamingReadWritePartitionedRecordsWithoutPk() throws
Exception {
+ // input is dailyRatesChangelogWithUB()
+ // enable log store, file store bounded read with merge
+ checkFileStorePath(
+ tEnv,
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ true,
+ false,
+ Collections.emptyMap(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L,
"2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 115L,
"2022-01-02"))));
+ }
+
+ @Test
+ public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws
Exception {
+ // input is ratesChangelogWithoutUB()
+ // enable log store, file store bounded read with merge
+ checkFileStorePath(
+ tEnv,
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ true,
+ Collections.emptyMap(),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 119L))));
+ }
+
+ @Test
+ public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws
Exception {
+ // input is ratesChangelogWithUB()
+ // enable log store, with default full scan mode, will merge
+ checkFileStorePath(
+ tEnv,
+ collectAndCheckStreamingReadWriteWithClose(
+ true,
+ false,
+ false,
+ Collections.emptyMap(),
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 119L))));
+ }
+
+ @Test
+ public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws
Exception {
+ // input is dailyRatesChangelogWithoutUB()
+ collectLatestLogAndCheck(
+ false,
+ true,
+ true,
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-U", "Euro", 114L, "2022-01-01"),
+ changelogRow("+U", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01"),
// part = 2022-01-02
- changelogRow("+I", "Euro", 119L, "2022-01-02"));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ changelogRow("+I", "Euro", 119L, "2022-01-02")));
}
@Test
- public void testStreamingReadWritePartitionedRecordsWithoutPk() throws
Exception {
- String managedTable = prepareEnvAndWrite(true, true, true, false);
-
+ public void testReadLatestChangelogOfPartitionedRecordsWithoutPk() throws
Exception {
// input is dailyRatesChangelogWithUB()
- // enable log store, file store bounded read with merge
- List<Row> expectedRecords =
+ collectLatestLogAndCheck(
+ false,
+ true,
+ false,
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
// part = 2022-01-02
- changelogRow("+I", "Euro", 115L, "2022-01-02"));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ changelogRow("+I", "Euro", 114L, "2022-01-02"),
+ changelogRow("-D", "Euro", 114L, "2022-01-02"),
+ changelogRow("+I", "Euro", 119L, "2022-01-02"),
+ changelogRow("-D", "Euro", 119L, "2022-01-02"),
+ changelogRow("+I", "Euro", 115L, "2022-01-02")));
}
@Test
- public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws
Exception {
- String managedTable = prepareEnvAndWrite(true, true, false, true);
-
+ public void testReadLatestChangelogOfNonPartitionedRecordsWithPk() throws
Exception {
// input is ratesChangelogWithoutUB()
- // enable log store, file store bounded read with merge
- List<Row> expectedRecords =
+ collectLatestLogAndCheck(
+ false,
+ false,
+ true,
Arrays.asList(
- changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 119L));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("-U", "Euro", 114L),
+ changelogRow("+U", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-D", "Yen", 1L)));
}
@Test
- public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws
Exception {
- String managedTable = prepareEnvAndWrite(true, true, false, false);
-
+ public void testReadLatestChangelogOfNonPartitionedRecordsWithoutPk()
throws Exception {
// input is ratesChangelogWithUB()
- // enable log store, with default full scan mode, will merge
- List<Row> expectedRecords =
+ collectLatestLogAndCheck(
+ false,
+ false,
+ false,
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("-D", "Euro", 114L),
+ changelogRow("+I", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-D", "Euro", 119L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-D", "Yen", 1L)));
+ }
+
+ @Test
+ public void testReadLatestChangelogOfInsertOnlyRecords() throws Exception {
+ // input is rates()
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("-U", "Euro", 114L),
+ changelogRow("+U", "Euro", 119L));
+
+ // currency as pk
+ collectLatestLogAndCheck(true, false, true, expected);
+
+ // without pk
+ collectLatestLogAndCheck(true, false, true, expected);
+ }
+
+ @Test
+ public void testReadInsertOnlyChangelogFromTimestamp() throws Exception {
+ // input is dailyRates()
+ collectChangelogFromTimestampAndCheck(
+ true,
+ true,
+ true,
+ 0,
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02")));
+
+ collectChangelogFromTimestampAndCheck(true, true, false, 0,
dailyRates());
+
+ // input is rates()
+ collectChangelogFromTimestampAndCheck(
+ true,
+ false,
+ true,
+ 0,
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("-U", "Euro", 114L),
+ changelogRow("+U", "Euro", 119L)));
+
+ collectChangelogFromTimestampAndCheck(true, false, false, 0, rates());
+
+ collectChangelogFromTimestampAndCheck(
+ true, false, false, Long.MAX_VALUE - 1,
Collections.emptyList());
+ }
+
+ @Test
+ public void testReadRetractChangelogFromTimestamp() throws Exception {
+ // input is dailyRatesChangelogWithUB()
+ collectChangelogFromTimestampAndCheck(
+ false,
+ true,
+ false,
+ 0,
Arrays.asList(
- changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 119L));
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 114L, "2022-01-02"),
+ changelogRow("-D", "Euro", 114L, "2022-01-02"),
+ changelogRow("+I", "Euro", 119L, "2022-01-02"),
+ changelogRow("-D", "Euro", 119L, "2022-01-02"),
+ changelogRow("+I", "Euro", 115L, "2022-01-02")));
- collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
+ // input is dailyRatesChangelogWithoutUB()
+ collectChangelogFromTimestampAndCheck(
+ false,
+ true,
+ true,
+ 0,
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-U", "Euro", 114L, "2022-01-01"),
+ changelogRow("+U", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02")));
}
// ------------------------ Tools ----------------------------------
- private String prepareEnvAndWrite(
- boolean streaming, boolean enableLogStore, boolean partitioned,
boolean hasPk)
+ private String collectAndCheckBatchReadWrite(
+ boolean partitioned, boolean hasPk, List<Row> expected) throws
Exception {
+ return collectAndCheckUnderSameEnv(
+ false,
+ false,
+ true,
+ partitioned,
+ hasPk,
+ true,
+ Collections.emptyMap(),
+ expected)
+ .f0;
+ }
+
+ private String collectAndCheckStreamingReadWriteWithClose(
+ boolean enableLogStore,
+ boolean partitioned,
+ boolean hasPk,
+ Map<String, String> readHints,
+ List<Row> expected)
+ throws Exception {
+ Tuple2<String, BlockingIterator<Row, Row>> tuple =
+ collectAndCheckUnderSameEnv(
+ true, enableLogStore, false, partitioned, hasPk, true,
readHints, expected);
+ tuple.f1.close();
+ return tuple.f0;
+ }
+
+ private Tuple2<String, BlockingIterator<Row, Row>>
+ collectAndCheckStreamingReadWriteWithoutClose(
+ Map<String, String> readHints, List<Row> expected) throws
Exception {
+ return collectAndCheckUnderSameEnv(
+ true, true, false, true, true, true, readHints, expected);
+ }
+
+ private void collectLatestLogAndCheck(
+ boolean insertOnly, boolean partitioned, boolean hasPk, List<Row>
expected)
+ throws Exception {
+ Map<String, String> hints = new HashMap<>();
+ hints.put(
+ LOG_PREFIX + LogOptions.SCAN.key(),
+ LogOptions.LogStartupMode.LATEST.name().toLowerCase());
+ collectAndCheckUnderSameEnv(
+ true, true, insertOnly, partitioned, hasPk, false,
hints, expected)
+ .f1
+ .close();
+ }
+
+ private void collectChangelogFromTimestampAndCheck(
+ boolean insertOnly,
+ boolean partitioned,
+ boolean hasPk,
+ long timestamp,
+ List<Row> expected)
+ throws Exception {
+ Map<String, String> hints = new HashMap<>();
+ hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp");
+ hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(),
String.valueOf(timestamp));
+ collectAndCheckUnderSameEnv(
+ true, true, insertOnly, partitioned, hasPk, true,
hints, expected)
+ .f1
+ .close();
+ }
+
+ private Tuple2<String, BlockingIterator<Row, Row>>
collectAndCheckUnderSameEnv(
+ boolean streaming,
+ boolean enableLogStore,
+ boolean insertOnly,
+ boolean partitioned,
+ boolean hasPk,
+ boolean writeFirst,
+ Map<String, String> readHints,
+ List<Row> expected)
throws Exception {
Map<String, String> tableOptions = new HashMap<>();
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
@@ -282,7 +545,11 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
String helperTableDdl;
if (streaming) {
helperTableDdl =
- prepareHelperSourceWithChangelogRecords(sourceTable,
partitioned, hasPk);
+ insertOnly
+ ? prepareHelperSourceWithInsertOnlyRecords(
+ sourceTable, partitioned, hasPk)
+ : prepareHelperSourceWithChangelogRecords(
+ sourceTable, partitioned, hasPk);
env = buildStreamEnv();
builder.inStreamingMode();
} else {
@@ -292,13 +559,29 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
builder.inBatchMode();
}
String managedTableDdl = prepareManagedTableDdl(sourceTable,
managedTable, tableOptions);
- String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
tEnv = StreamTableEnvironment.create(env, builder.build());
tEnv.executeSql(helperTableDdl);
tEnv.executeSql(managedTableDdl);
- tEnv.executeSql(insertQuery).await();
- return managedTable;
+
+ String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
+ String selectQuery = prepareSimpleSelectQuery(managedTable, readHints);
+
+ BlockingIterator<Row, Row> iterator;
+ if (writeFirst) {
+ tEnv.executeSql(insertQuery).await();
+ iterator =
BlockingIterator.of(tEnv.executeSql(selectQuery).collect());
+ } else {
+ iterator =
BlockingIterator.of(tEnv.executeSql(selectQuery).collect());
+ tEnv.executeSql(insertQuery).await();
+ }
+ if (expected.isEmpty()) {
+ assertNoMoreRecords(iterator);
+ } else {
+ assertThat(iterator.collect(expected.size(), 10, TimeUnit.SECONDS))
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+ return Tuple2.of(managedTable, iterator);
}
private void prepareEnvAndOverwrite(
@@ -465,7 +748,7 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
private static String buildHints(Map<String, String> hints) {
if (hints.size() > 0) {
- StringBuilder hintsBuilder = new StringBuilder("/* + OPTIONS (");
+ StringBuilder hintsBuilder = new StringBuilder("/*+ OPTIONS (");
hints.forEach(
(k, v) -> {
hintsBuilder.append(String.format("'%s'", k));
@@ -473,7 +756,7 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
hintsBuilder.append(String.format("'%s', ", v));
});
int len = hintsBuilder.length();
- hintsBuilder.deleteCharAt(len - 1);
+ hintsBuilder.deleteCharAt(len - 2);
hintsBuilder.append(") */");
return hintsBuilder.toString();
}
diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml
index 22787ae..04b8e8a 100644
--- a/flink-table-store-kafka/pom.xml
+++ b/flink-table-store-kafka/pom.xml
@@ -110,7 +110,7 @@ under the License.
<!-- include 2.0 server for tests -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>2.4.1</version>
+ <version>2.8.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index d504802..60308a0 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -52,7 +52,7 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
private final int[] primaryKey;
- @Nullable private final DeserializationSchema<RowData> keyDeserializer;
+ @Nullable private final DeserializationSchema<RowData>
primaryKeyDeserializer;
private final DeserializationSchema<RowData> valueDeserializer;
@@ -69,7 +69,7 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
Properties properties,
DataType physicalType,
int[] primaryKey,
- @Nullable DeserializationSchema<RowData> keyDeserializer,
+ @Nullable DeserializationSchema<RowData> primaryKeyDeserializer,
DeserializationSchema<RowData> valueDeserializer,
@Nullable int[][] projectFields,
LogConsistency consistency,
@@ -79,7 +79,7 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
this.properties = properties;
this.physicalType = physicalType;
this.primaryKey = primaryKey;
- this.keyDeserializer = keyDeserializer;
+ this.primaryKeyDeserializer = primaryKeyDeserializer;
this.valueDeserializer = valueDeserializer;
this.projectFields = projectFields;
this.consistency = consistency;
@@ -95,7 +95,7 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
properties.setProperty(ISOLATION_LEVEL_CONFIG,
"read_committed");
break;
case EVENTUAL:
- if (keyDeserializer == null) {
+ if (primaryKeyDeserializer == null) {
throw new IllegalArgumentException(
"Can not use EVENTUAL consistency mode for non-pk
table.");
}
@@ -117,7 +117,7 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
new KafkaLogDeserializationSchema(
physicalType,
primaryKey,
- keyDeserializer,
+ primaryKeyDeserializer,
valueDeserializer,
projectFields));
}