This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 7efe80c25cd6b3ecd677a71714616411546a7604 Author: Jane Chan <[email protected]> AuthorDate: Wed Mar 30 14:43:42 2022 +0800 [FLINK-26879] Fix that LogStartupMode.LATEST and LogStartupMode.FROM_TIMESTAMP don't work This closes #65 --- flink-table-store-connector/pom.xml | 8 +- .../flink/table/store/connector/TableStore.java | 37 +- .../store/connector/source/TableStoreSource.java | 1 - .../store/connector/ReadWriteTableITCase.java | 427 +++++++++++++++++---- flink-table-store-kafka/pom.xml | 2 +- .../table/store/kafka/KafkaLogSourceProvider.java | 10 +- 6 files changed, 380 insertions(+), 105 deletions(-) diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml index f2e2022..cf4fb28 100644 --- a/flink-table-store-connector/pom.xml +++ b/flink-table-store-connector/pom.xml @@ -212,7 +212,7 @@ under the License. <!-- include 2.0 server for tests --> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> - <version>2.4.1</version> + <version>2.8.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> @@ -239,6 +239,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>2.8.1</version> + </dependency> + + <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>${testcontainers.version}</version> diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java index 9374933..d222018 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java @@ -227,8 +227,6 @@ public class TableStore { private boolean isContinuous = false; - private boolean isHybrid = true; - @Nullable private int[][] projectedFields; @Nullable private Predicate partitionPredicate; @@ -257,11 +255,6 @@ public class TableStore { return this; } - public SourceBuilder withHybridMode(boolean isHybrid) { - this.isHybrid = isHybrid; - return this; - } - public SourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) { this.logSourceProvider = logSourceProvider; return this; @@ -271,20 +264,14 @@ public class TableStore { return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis(); } - private FileStoreSource buildFileSource(boolean isContinuous) { - FileStore fileStore = buildFileStore(); - - boolean latestContinuous = false; - if (isContinuous) { - LogStartupMode startupMode = logOptions().get(SCAN); - latestContinuous = startupMode == LogStartupMode.LATEST; - } + private FileStoreSource buildFileSource( + boolean isContinuous, boolean continuousScanLatest) { return new FileStoreSource( - fileStore, + buildFileStore(), primaryKeys.length == 0, isContinuous, discoveryIntervalMills(), - latestContinuous, + continuousScanLatest, projectedFields, partitionPredicate, fieldPredicate); @@ -292,22 +279,22 @@ public class TableStore { public Source<RowData, ?, ?> build() { if (isContinuous) { + LogStartupMode startupMode = logOptions().get(SCAN); if (logSourceProvider == null) { - return buildFileSource(true); - } - - if (isHybrid) { + return buildFileSource(true, startupMode == LogStartupMode.LATEST); + } else { + if (startupMode != LogStartupMode.FULL) { + return logSourceProvider.createSource(null); + } return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder( - buildFileSource(false)) + buildFileSource(false, false)) .addSource( new LogHybridSourceFactory(logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED) .build(); - } else { - return logSourceProvider.createSource(null); } } else { - return buildFileSource(false); + return buildFileSource(false, false); } } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java index 11da93e..8de30f6 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java @@ -135,7 +135,6 @@ public class TableStoreSource tableStore .sourceBuilder() .withContinuousMode(streaming) - .withHybridMode(streaming && logSourceProvider != null) .withLogSourceProvider(logSourceProvider) .withProjection(projectFields) .withPartitionPredicate(PredicateConverter.convert(partitionFilters)) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java index 86ec5dc..3458529 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.table.store.connector; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; @@ -29,6 +30,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.store.file.FileStoreOptions; import org.apache.flink.table.store.file.utils.BlockingIterator; import org.apache.flink.table.store.kafka.KafkaTableTestBase; +import org.apache.flink.table.store.log.LogOptions; import org.apache.flink.types.Row; import org.junit.Test; @@ -58,8 +60,6 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { @Test public void testBatchWriteWithPartitionedRecordsWithPk() throws Exception { - String managedTable = prepareEnvAndWrite(false, false, true, true); - // input is dailyRates() List<Row> expectedRecords = Arrays.asList( @@ -70,7 +70,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { // part = 2022-01-02 changelogRow("+I", "Euro", 119L, "2022-01-02")); // test batch read - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + String managedTable = collectAndCheckBatchReadWrite(true, true, expectedRecords); checkFileStorePath(tEnv, managedTable); // test streaming read @@ -91,22 +91,27 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { assertNoMoreRecords(streamIter); // batch read to check partition refresh - expectedRecords = new ArrayList<>(expectedRecords); - expectedRecords.remove(3); - expectedRecords.add(changelogRow("+I", "Euro", 100L, "2022-01-02")); - expectedRecords.add(changelogRow("+I", "Yen", 1L, "2022-01-02")); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + collectAndCheck( + tEnv, + managedTable, + Collections.emptyMap(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 100L, "2022-01-02"), + changelogRow("+I", "Yen", 1L, "2022-01-02"))) + .close(); } @Test public void testBatchWriteWithPartitionedRecordsWithoutPk() throws Exception { - String managedTable = prepareEnvAndWrite(false, false, true, false); - // input is dailyRates() - List<Row> expectedRecords = dailyRates(); // test batch read - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + String managedTable = collectAndCheckBatchReadWrite(true, false, dailyRates()); checkFileStorePath(tEnv, managedTable); // overwrite dynamic partition @@ -135,15 +140,15 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { @Test public void testBatchWriteWithNonPartitionedRecordsWithPk() throws Exception { - String managedTable = prepareEnvAndWrite(false, false, false, true); - // input is rates() - List<Row> expectedRecords = - Arrays.asList( - changelogRow("+I", "US Dollar", 102L), - changelogRow("+I", "Yen", 1L), - changelogRow("+I", "Euro", 119L)); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + String managedTable = + collectAndCheckBatchReadWrite( + false, + true, + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Yen", 1L), + changelogRow("+I", "Euro", 119L))); checkFileStorePath(tEnv, managedTable); // overwrite the whole table @@ -151,37 +156,35 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { managedTable, Collections.emptyMap(), Collections.singletonList(new String[] {"'Euro'", "100"})); - expectedRecords = new ArrayList<>(expectedRecords); - expectedRecords.clear(); - expectedRecords.add(changelogRow("+I", "Euro", 100L)); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + collectAndCheck( + tEnv, + managedTable, + Collections.emptyMap(), + Collections.singletonList(changelogRow("+I", "Euro", 100L))); } @Test public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception { - String managedTable = prepareEnvAndWrite(false, false, false, false); - // input is rates() - List<Row> expectedRecords = rates(); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + String managedTable = collectAndCheckBatchReadWrite(false, false, rates()); checkFileStorePath(tEnv, managedTable); } @Test public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception { - String managedTable = prepareEnvAndWrite(true, true, true, true); - // input is dailyRatesChangelogWithoutUB() // test hybrid read - List<Row> expectedRecords = - Arrays.asList( - // part = 2022-01-01 - changelogRow("+I", "US Dollar", 102L, "2022-01-01"), - // part = 2022-01-02 - changelogRow("+I", "Euro", 119L, "2022-01-02")); - BlockingIterator<Row, Row> streamIter = - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords); + Tuple2<String, BlockingIterator<Row, Row>> tuple = + collectAndCheckStreamingReadWriteWithoutClose( + Collections.emptyMap(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 119L, "2022-01-02"))); + String managedTable = tuple.f0; checkFileStorePath(tEnv, managedTable); + BlockingIterator<Row, Row> streamIter = tuple.f1; // overwrite partition 2022-01-02 prepareEnvAndOverwrite( @@ -210,64 +213,324 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { @Test public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception { - String managedTable = prepareEnvAndWrite(true, false, true, true); - // input is dailyRatesChangelogWithoutUB() // file store continuous read // will not merge, at least collect two records - List<Row> expectedRecords = + checkFileStorePath( + tEnv, + collectAndCheckStreamingReadWriteWithClose( + false, + true, + true, + Collections.emptyMap(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 119L, "2022-01-02")))); + } + + @Test + public void testStreamingReadWritePartitionedRecordsWithoutPk() throws Exception { + // input is dailyRatesChangelogWithUB() + // enable log store, file store bounded read with merge + checkFileStorePath( + tEnv, + collectAndCheckStreamingReadWriteWithClose( + true, + true, + false, + Collections.emptyMap(), + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 115L, "2022-01-02")))); + } + + @Test + public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws Exception { + // input is ratesChangelogWithoutUB() + // enable log store, file store bounded read with merge + checkFileStorePath( + tEnv, + collectAndCheckStreamingReadWriteWithClose( + true, + false, + true, + Collections.emptyMap(), + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 119L)))); + } + + @Test + public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws Exception { + // input is ratesChangelogWithUB() + // enable log store, with default full scan mode, will merge + checkFileStorePath( + tEnv, + collectAndCheckStreamingReadWriteWithClose( + true, + false, + false, + Collections.emptyMap(), + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 119L)))); + } + + @Test + public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws Exception { + // input is dailyRatesChangelogWithoutUB() + collectLatestLogAndCheck( + false, + true, + true, Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("-U", "Euro", 114L, "2022-01-01"), + changelogRow("+U", "Euro", 116L, "2022-01-01"), + changelogRow("-D", "Yen", 1L, "2022-01-01"), + changelogRow("-D", "Euro", 116L, "2022-01-01"), // part = 2022-01-02 - changelogRow("+I", "Euro", 119L, "2022-01-02")); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + changelogRow("+I", "Euro", 119L, "2022-01-02"))); } @Test - public void testStreamingReadWritePartitionedRecordsWithoutPk() throws Exception { - String managedTable = prepareEnvAndWrite(true, true, true, false); - + public void testReadLatestChangelogOfPartitionedRecordsWithoutPk() throws Exception { // input is dailyRatesChangelogWithUB() - // enable log store, file store bounded read with merge - List<Row> expectedRecords = + collectLatestLogAndCheck( + false, + true, + false, Arrays.asList( // part = 2022-01-01 changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Euro", 116L, "2022-01-01"), + changelogRow("-D", "Euro", 116L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("-D", "Yen", 1L, "2022-01-01"), // part = 2022-01-02 - changelogRow("+I", "Euro", 115L, "2022-01-02")); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + changelogRow("+I", "Euro", 114L, "2022-01-02"), + changelogRow("-D", "Euro", 114L, "2022-01-02"), + changelogRow("+I", "Euro", 119L, "2022-01-02"), + changelogRow("-D", "Euro", 119L, "2022-01-02"), + changelogRow("+I", "Euro", 115L, "2022-01-02"))); } @Test - public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws Exception { - String managedTable = prepareEnvAndWrite(true, true, false, true); - + public void testReadLatestChangelogOfNonPartitionedRecordsWithPk() throws Exception { // input is ratesChangelogWithoutUB() - // enable log store, file store bounded read with merge - List<Row> expectedRecords = + collectLatestLogAndCheck( + false, + false, + true, Arrays.asList( - changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L)); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Yen", 1L), + changelogRow("-U", "Euro", 114L), + changelogRow("+U", "Euro", 116L), + changelogRow("-D", "Euro", 116L), + changelogRow("+I", "Euro", 119L), + changelogRow("-D", "Yen", 1L))); } @Test - public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws Exception { - String managedTable = prepareEnvAndWrite(true, true, false, false); - + public void testReadLatestChangelogOfNonPartitionedRecordsWithoutPk() throws Exception { // input is ratesChangelogWithUB() - // enable log store, with default full scan mode, will merge - List<Row> expectedRecords = + collectLatestLogAndCheck( + false, + false, + false, + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Yen", 1L), + changelogRow("-D", "Euro", 114L), + changelogRow("+I", "Euro", 116L), + changelogRow("-D", "Euro", 116L), + changelogRow("+I", "Euro", 119L), + changelogRow("-D", "Euro", 119L), + changelogRow("+I", "Euro", 119L), + changelogRow("-D", "Yen", 1L))); + } + + @Test + public void testReadLatestChangelogOfInsertOnlyRecords() throws Exception { + // input is rates() + List<Row> expected = + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Yen", 1L), + changelogRow("-U", "Euro", 114L), + changelogRow("+U", "Euro", 119L)); + + // currency as pk + collectLatestLogAndCheck(true, false, true, expected); + + // without pk + collectLatestLogAndCheck(true, false, true, expected); + } + + @Test + public void testReadInsertOnlyChangelogFromTimestamp() throws Exception { + // input is dailyRates() + collectChangelogFromTimestampAndCheck( + true, + true, + true, + 0, + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 119L, "2022-01-02"))); + + collectChangelogFromTimestampAndCheck(true, true, false, 0, dailyRates()); + + // input is rates() + collectChangelogFromTimestampAndCheck( + true, + false, + true, + 0, + Arrays.asList( + changelogRow("+I", "US Dollar", 102L), + changelogRow("+I", "Euro", 114L), + changelogRow("+I", "Yen", 1L), + changelogRow("-U", "Euro", 114L), + changelogRow("+U", "Euro", 119L))); + + collectChangelogFromTimestampAndCheck(true, false, false, 0, rates()); + + collectChangelogFromTimestampAndCheck( + true, false, false, Long.MAX_VALUE - 1, Collections.emptyList()); + } + + @Test + public void testReadRetractChangelogFromTimestamp() throws Exception { + // input is dailyRatesChangelogWithUB() + collectChangelogFromTimestampAndCheck( + false, + true, + false, + 0, Arrays.asList( - changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L)); + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Euro", 116L, "2022-01-01"), + changelogRow("-D", "Euro", 116L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("-D", "Yen", 1L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 114L, "2022-01-02"), + changelogRow("-D", "Euro", 114L, "2022-01-02"), + changelogRow("+I", "Euro", 119L, "2022-01-02"), + changelogRow("-D", "Euro", 119L, "2022-01-02"), + changelogRow("+I", "Euro", 115L, "2022-01-02"))); - collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close(); + // input is dailyRatesChangelogWithoutUB() + collectChangelogFromTimestampAndCheck( + false, + true, + true, + 0, + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 102L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("-U", "Euro", 114L, "2022-01-01"), + changelogRow("+U", "Euro", 116L, "2022-01-01"), + changelogRow("-D", "Yen", 1L, "2022-01-01"), + changelogRow("-D", "Euro", 116L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 119L, "2022-01-02"))); } // ------------------------ Tools ---------------------------------- - private String prepareEnvAndWrite( - boolean streaming, boolean enableLogStore, boolean partitioned, boolean hasPk) + private String collectAndCheckBatchReadWrite( + boolean partitioned, boolean hasPk, List<Row> expected) throws Exception { + return collectAndCheckUnderSameEnv( + false, + false, + true, + partitioned, + hasPk, + true, + Collections.emptyMap(), + expected) + .f0; + } + + private String collectAndCheckStreamingReadWriteWithClose( + boolean enableLogStore, + boolean partitioned, + boolean hasPk, + Map<String, String> readHints, + List<Row> expected) + throws Exception { + Tuple2<String, BlockingIterator<Row, Row>> tuple = + collectAndCheckUnderSameEnv( + true, enableLogStore, false, partitioned, hasPk, true, readHints, expected); + tuple.f1.close(); + return tuple.f0; + } + + private Tuple2<String, BlockingIterator<Row, Row>> + collectAndCheckStreamingReadWriteWithoutClose( + Map<String, String> readHints, List<Row> expected) throws Exception { + return collectAndCheckUnderSameEnv( + true, true, false, true, true, true, readHints, expected); + } + + private void collectLatestLogAndCheck( + boolean insertOnly, boolean partitioned, boolean hasPk, List<Row> expected) + throws Exception { + Map<String, String> hints = new HashMap<>(); + hints.put( + LOG_PREFIX + LogOptions.SCAN.key(), + LogOptions.LogStartupMode.LATEST.name().toLowerCase()); + collectAndCheckUnderSameEnv( + true, true, insertOnly, partitioned, hasPk, false, hints, expected) + .f1 + .close(); + } + + private void collectChangelogFromTimestampAndCheck( + boolean insertOnly, + boolean partitioned, + boolean hasPk, + long timestamp, + List<Row> expected) + throws Exception { + Map<String, String> hints = new HashMap<>(); + hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp"); + hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp)); + collectAndCheckUnderSameEnv( + true, true, insertOnly, partitioned, hasPk, true, hints, expected) + .f1 + .close(); + } + + private Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckUnderSameEnv( + boolean streaming, + boolean enableLogStore, + boolean insertOnly, + boolean partitioned, + boolean hasPk, + boolean writeFirst, + Map<String, String> readHints, + List<Row> expected) throws Exception { Map<String, String> tableOptions = new HashMap<>(); rootPath = TEMPORARY_FOLDER.newFolder().getPath(); @@ -282,7 +545,11 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { String helperTableDdl; if (streaming) { helperTableDdl = - prepareHelperSourceWithChangelogRecords(sourceTable, partitioned, hasPk); + insertOnly + ? prepareHelperSourceWithInsertOnlyRecords( + sourceTable, partitioned, hasPk) + : prepareHelperSourceWithChangelogRecords( + sourceTable, partitioned, hasPk); env = buildStreamEnv(); builder.inStreamingMode(); } else { @@ -292,13 +559,29 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { builder.inBatchMode(); } String managedTableDdl = prepareManagedTableDdl(sourceTable, managedTable, tableOptions); - String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable); tEnv = StreamTableEnvironment.create(env, builder.build()); tEnv.executeSql(helperTableDdl); tEnv.executeSql(managedTableDdl); - tEnv.executeSql(insertQuery).await(); - return managedTable; + + String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable); + String selectQuery = prepareSimpleSelectQuery(managedTable, readHints); + + BlockingIterator<Row, Row> iterator; + if (writeFirst) { + tEnv.executeSql(insertQuery).await(); + iterator = BlockingIterator.of(tEnv.executeSql(selectQuery).collect()); + } else { + iterator = BlockingIterator.of(tEnv.executeSql(selectQuery).collect()); + tEnv.executeSql(insertQuery).await(); + } + if (expected.isEmpty()) { + assertNoMoreRecords(iterator); + } else { + assertThat(iterator.collect(expected.size(), 10, TimeUnit.SECONDS)) + .containsExactlyInAnyOrderElementsOf(expected); + } + return Tuple2.of(managedTable, iterator); } private void prepareEnvAndOverwrite( @@ -465,7 +748,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { private static String buildHints(Map<String, String> hints) { if (hints.size() > 0) { - StringBuilder hintsBuilder = new StringBuilder("/* + OPTIONS ("); + StringBuilder hintsBuilder = new StringBuilder("/*+ OPTIONS ("); hints.forEach( (k, v) -> { hintsBuilder.append(String.format("'%s'", k)); @@ -473,7 +756,7 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { hintsBuilder.append(String.format("'%s', ", v)); }); int len = hintsBuilder.length(); - hintsBuilder.deleteCharAt(len - 1); + hintsBuilder.deleteCharAt(len - 2); hintsBuilder.append(") */"); return hintsBuilder.toString(); } diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml index bb8633f..17491e2 100644 --- a/flink-table-store-kafka/pom.xml +++ b/flink-table-store-kafka/pom.xml @@ -110,7 +110,7 @@ under the License. <!-- include 2.0 server for tests --> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> - <version>2.4.1</version> + <version>2.8.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java index d504802..60308a0 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java @@ -52,7 +52,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider { private final int[] primaryKey; - @Nullable private final DeserializationSchema<RowData> keyDeserializer; + @Nullable private final DeserializationSchema<RowData> primaryKeyDeserializer; private final DeserializationSchema<RowData> valueDeserializer; @@ -69,7 +69,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider { Properties properties, DataType physicalType, int[] primaryKey, - @Nullable DeserializationSchema<RowData> keyDeserializer, + @Nullable DeserializationSchema<RowData> primaryKeyDeserializer, DeserializationSchema<RowData> valueDeserializer, @Nullable int[][] projectFields, LogConsistency consistency, @@ -79,7 +79,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider { this.properties = properties; this.physicalType = physicalType; this.primaryKey = primaryKey; - this.keyDeserializer = keyDeserializer; + this.primaryKeyDeserializer = primaryKeyDeserializer; this.valueDeserializer = valueDeserializer; this.projectFields = projectFields; this.consistency = consistency; @@ -95,7 +95,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider { properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed"); break; case EVENTUAL: - if (keyDeserializer == null) { + if (primaryKeyDeserializer == null) { throw new IllegalArgumentException( "Can not use EVENTUAL consistency mode for non-pk table."); } @@ -117,7 +117,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider { new KafkaLogDeserializationSchema( physicalType, primaryKey, - keyDeserializer, + primaryKeyDeserializer, valueDeserializer, projectFields)); }
