This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit fc31198d864ebafd36cf23ce302b6c1b4038b579 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Wed Apr 5 14:49:12 2023 -0700 [FLINK-31740] [upsert-kafka] Allow setting boundedness options for upsert-kafka This closes #22. --- .../kafka/table/KafkaConnectorOptionsUtil.java | 2 +- .../table/UpsertKafkaDynamicTableFactory.java | 19 +- .../table/UpsertKafkaDynamicTableFactoryTest.java | 227 ++++++++++++++++++++- .../kafka/table/UpsertKafkaTableITCase.java | 199 ++++++++++++++++++ 4 files changed, 441 insertions(+), 6 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java index ef70644e..d6390e27 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java @@ -189,7 +189,7 @@ class KafkaConnectorOptionsUtil { }); } - private static void validateScanBoundedMode(ReadableConfig tableOptions) { + static void validateScanBoundedMode(ReadableConfig tableOptions) { tableOptions .getOptional(SCAN_BOUNDED_MODE) .ifPresent( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index 254e1bf9..b9f2ea71 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -25,8 +25,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -57,6 +57,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM; @@ -68,9 +71,11 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode; /** Upsert-Kafka factory. */ public class UpsertKafkaDynamicTableFactory @@ -101,6 +106,9 @@ public class UpsertKafkaDynamicTableFactory options.add(SINK_PARALLELISM); options.add(SINK_BUFFER_FLUSH_INTERVAL); options.add(SINK_BUFFER_FLUSH_MAX_ROWS); + options.add(SCAN_BOUNDED_MODE); + options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); + options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); return options; } @@ -129,6 +137,8 @@ public class UpsertKafkaDynamicTableFactory // always use earliest to keep data integrity StartupMode earliest = StartupMode.EARLIEST; + final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + return new KafkaDynamicSource( context.getPhysicalRowDataType(), keyDecodingFormat, @@ -142,9 +152,9 @@ public class UpsertKafkaDynamicTableFactory earliest, Collections.emptyMap(), 0, - BoundedMode.UNBOUNDED, - Collections.emptyMap(), - 0, + boundedOptions.boundedMode, + boundedOptions.specificOffsets, + boundedOptions.boundedTimestampMillis, true, context.getObjectIdentifier().asSummaryString()); } @@ -228,6 +238,7 @@ public class UpsertKafkaDynamicTableFactory Format valueFormat, int[] primaryKeyIndexes) { validateTopic(tableOptions); + validateScanBoundedMode(tableOptions); validateFormat(keyFormat, valueFormat, tableOptions); validatePKConstraints(primaryKeyIndexes); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 5caaaa0a..959c44cf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -21,11 +21,14 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; import org.apache.flink.formats.avro.RowDataToAvroConverters; @@ -64,23 +67,29 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.util.TestLogger; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.function.Consumer; +import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link UpsertKafkaDynamicTableFactory}. */ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { @@ -390,6 +399,134 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { RowDataToAvroConverters.createConverter(rowType)); } + // -------------------------------------------------------------------------------------------- + // Bounded end-offset tests + // -------------------------------------------------------------------------------------------- + + @Test + public void testBoundedSpecificOffsetsValidate() { + final Map<String, String> options = getFullSourceOptions(); + options.put( + KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), + ScanBoundedMode.SPECIFIC_OFFSETS.toString()); + + assertThatThrownBy(() -> createTableSource(SOURCE_SCHEMA, options)) + .isInstanceOf(ValidationException.class) + .cause() + .hasMessageContaining( + "'scan.bounded.specific-offsets' is required in 'specific-offsets' bounded mode but missing."); + } + + @Test + public void testBoundedSpecificOffsets() { + testBoundedOffsets( + ScanBoundedMode.SPECIFIC_OFFSETS, + options -> { + options.put("scan.bounded.specific-offsets", "partition:0,offset:2"); + }, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(SOURCE_TOPIC, 0); + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.noInteractions()); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, 2L); + }); + } + + @Test + public void testBoundedLatestOffset() { + testBoundedOffsets( + ScanBoundedMode.LATEST_OFFSET, + options -> {}, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(SOURCE_TOPIC, 0); + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.noInteractions()); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET); + }); + } + + @Test + public void testBoundedGroupOffsets() { + testBoundedOffsets( + ScanBoundedMode.GROUP_OFFSETS, + options -> { + options.put("properties.group.id", "dummy"); + }, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(SOURCE_TOPIC, 0); + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.noInteractions()); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, KafkaPartitionSplit.COMMITTED_OFFSET); + }); + } + + @Test + public void testBoundedTimestamp() { + testBoundedOffsets( + ScanBoundedMode.TIMESTAMP, + options -> { + options.put("scan.bounded.timestamp-millis", "1"); + }, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(SOURCE_TOPIC, 0); + long offsetForTimestamp = 123L; + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.timestampAndEnd( + partitions -> { + assertThat(partitions) + .containsOnlyKeys(partition) + .containsEntry(partition, 1L); + Map<TopicPartition, OffsetAndTimestamp> result = + new HashMap<>(); + result.put( + partition, + new OffsetAndTimestamp( + offsetForTimestamp, 1L)); + return result; + }, + partitions -> { + Map<TopicPartition, Long> result = new HashMap<>(); + result.put( + partition, + // the end offset is bigger than given by + // timestamp + // to make sure the one for timestamp is + // used + offsetForTimestamp + 1000L); + return result; + })); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, offsetForTimestamp); + }); + } + // -------------------------------------------------------------------------------------------- // Negative tests // -------------------------------------------------------------------------------------------- @@ -647,7 +784,7 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { null); } - private void assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) { + private KafkaSource<?> assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) { assertThat(provider).isInstanceOf(DataStreamScanProvider.class); final DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) provider; final Transformation<RowData> transformation = @@ -662,5 +799,93 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { (SourceTransformation<RowData, KafkaPartitionSplit, KafkaSourceEnumState>) transformation; assertThat(sourceTransformation.getSource()).isInstanceOf(KafkaSource.class); + return (KafkaSource<?>) sourceTransformation.getSource(); + } + + private void testBoundedOffsets( + ScanBoundedMode boundedMode, + Consumer<Map<String, String>> optionsConfig, + Consumer<KafkaSource<?>> validator) { + final Map<String, String> options = getFullSourceOptions(); + options.put(KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), boundedMode.toString()); + optionsConfig.accept(options); + + final DynamicTableSource tableSource = createTableSource(SOURCE_SCHEMA, options); + assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class); + ScanTableSource.ScanRuntimeProvider provider = + ((KafkaDynamicSource) tableSource) + .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(provider).isInstanceOf(DataStreamScanProvider.class); + final KafkaSource<?> kafkaSource = assertKafkaSource(provider); + validator.accept(kafkaSource); + } + + private interface OffsetsRetriever + extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {} + + private interface TimestampOffsetsRetriever + extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {} + + private static final class MockPartitionOffsetsRetriever + implements OffsetsInitializer.PartitionOffsetsRetriever { + + public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL = + partitions -> { + throw new UnsupportedOperationException( + "The method was not supposed to be called"); + }; + private final OffsetsRetriever committedOffsets; + private final OffsetsRetriever endOffsets; + private final OffsetsRetriever beginningOffsets; + private final TimestampOffsetsRetriever offsetsForTimes; + + static MockPartitionOffsetsRetriever noInteractions() { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, + UNSUPPORTED_RETRIEVAL, + UNSUPPORTED_RETRIEVAL, + partitions -> { + throw new UnsupportedOperationException( + "The method was not supposed to be called"); + }); + } + + static MockPartitionOffsetsRetriever timestampAndEnd( + TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever); + } + + private MockPartitionOffsetsRetriever( + OffsetsRetriever committedOffsets, + OffsetsRetriever endOffsets, + OffsetsRetriever beginningOffsets, + TimestampOffsetsRetriever offsetsForTimes) { + this.committedOffsets = committedOffsets; + this.endOffsets = endOffsets; + this.beginningOffsets = beginningOffsets; + this.offsetsForTimes = offsetsForTimes; + } + + @Override + public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) { + return committedOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + return endOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + return beginningOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( + Map<TopicPartition, Long> timestampsToSearch) { + return offsetsForTimes.apply(timestampsToSearch); + } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java index 109f4402..23f638f6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java @@ -33,17 +33,20 @@ import org.junit.runners.Parameterized; import java.time.Duration; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import static org.apache.flink.api.common.typeinfo.Types.INT; import static org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME; import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED; import static org.apache.flink.api.common.typeinfo.Types.STRING; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.comparedWithKeyAndOrder; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults; @@ -388,6 +391,202 @@ public class UpsertKafkaTableITCase extends KafkaTableTestBase { deleteTestTopic(topic); } + @Test + public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception { + final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + final String bootstraps = getBootstrapServers(); + + // table with upsert-kafka connector, bounded mode up to offset=2 + final String createTableSql = + String.format( + "CREATE TABLE upsert_kafka (\n" + + " `user_id` BIGINT,\n" + + " `event_id` BIGINT,\n" + + " `payload` STRING,\n" + + " PRIMARY KEY (event_id, user_id) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'upsert-kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'key.format' = '%s',\n" + + " 'value.format' = '%s',\n" + + " 'value.fields-include' = 'ALL',\n" + + " 'scan.bounded.mode' = 'specific-offsets',\n" + + " 'scan.bounded.specific-offsets' = 'partition:0,offset:2'" + + ")", + topic, bootstraps, format, format); + tEnv.executeSql(createTableSql); + + // insert multiple values to have more records past offset=2 + final String insertValuesSql = + "INSERT INTO upsert_kafka\n" + + "VALUES\n" + + " (1, 100, 'payload 1'),\n" + + " (1, 100, 'payload 1-new'),\n" + + " (2, 101, 'payload 2'),\n" + + " (3, 102, 'payload 3')"; + tEnv.executeSql(insertValuesSql).await(); + + // results should only have records up to offset=2 + final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka")); + final List<Row> expected = + Arrays.asList( + changelogRow("+I", 1L, 100L, "payload 1"), + changelogRow("-U", 1L, 100L, "payload 1"), + changelogRow("+U", 1L, 100L, "payload 1-new")); + assertThat(results).satisfies(matching(deepEqualTo(expected, true))); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + + @Test + public void testUpsertKafkaSourceSinkWithBoundedTimestamp() throws Exception { + final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + final String bootstraps = getBootstrapServers(); + + // table with upsert-kafka connector, bounded mode up to timestamp 2023-03-10T14:00:00.000 + final String createTableSql = + String.format( + "CREATE TABLE upsert_kafka (\n" + + " `user_id` BIGINT,\n" + + " `timestamp` TIMESTAMP(3) METADATA,\n" + + " `event_id` BIGINT,\n" + + " `payload` STRING,\n" + + " PRIMARY KEY (event_id, user_id) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'upsert-kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'key.format' = '%s',\n" + + " 'value.format' = '%s',\n" + + " 'value.fields-include' = 'ALL',\n" + + " 'scan.bounded.mode' = 'timestamp',\n" + + " 'scan.bounded.timestamp-millis' = '%d'" + + ")", + topic, + bootstraps, + format, + format, + LocalDateTime.parse("2023-03-10T14:00:00.000") + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli()); + tEnv.executeSql(createTableSql); + + // insert multiple values with timestamp starting from 2023-03-08 up to 2023-03-11 + final String insertValuesSql = + "INSERT INTO upsert_kafka\n" + + "VALUES\n" + + " (1, TIMESTAMP '2023-03-08 08:10:10.666', 100, 'payload 1'),\n" + + " (2, TIMESTAMP '2023-03-09 13:12:11.123', 101, 'payload 2'),\n" + + " (1, TIMESTAMP '2023-03-10 12:09:50.321', 100, 'payload 1-new'),\n" + + " (2, TIMESTAMP '2023-03-11 17:15:13.457', 101, 'payload 2-new')"; + tEnv.executeSql(insertValuesSql).await(); + + // results should only have records up to timestamp 2023-03-10T14:00:00.000 + final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka")); + final List<Row> expected = + Arrays.asList( + changelogRow( + "+I", + 1L, + LocalDateTime.parse("2023-03-08T08:10:10.666"), + 100L, + "payload 1"), + changelogRow( + "+I", + 2L, + LocalDateTime.parse("2023-03-09T13:12:11.123"), + 101L, + "payload 2"), + changelogRow( + "-U", + 1L, + LocalDateTime.parse("2023-03-08T08:10:10.666"), + 100L, + "payload 1"), + changelogRow( + "+U", + 1L, + LocalDateTime.parse("2023-03-10T12:09:50.321"), + 100L, + "payload 1-new")); + assertThat(results).satisfies(matching(deepEqualTo(expected, true))); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + + /** + * Tests that setting bounded end offset that is before the earliest offset results in 0 + * results. + */ + @Test + public void testUpsertKafkaSourceSinkWithZeroLengthBoundedness() throws Exception { + final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + final String bootstraps = getBootstrapServers(); + + // table with upsert-kafka connector, bounded mode up to timestamp 2023-03-10T14:00:00.000 + final String createTableSql = + String.format( + "CREATE TABLE upsert_kafka (\n" + + " `user_id` BIGINT,\n" + + " `timestamp` TIMESTAMP(3) METADATA,\n" + + " `event_id` BIGINT,\n" + + " `payload` STRING,\n" + + " PRIMARY KEY (event_id, user_id) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'upsert-kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'key.format' = '%s',\n" + + " 'value.format' = '%s',\n" + + " 'value.fields-include' = 'ALL',\n" + + " 'scan.bounded.mode' = 'timestamp',\n" + + " 'scan.bounded.timestamp-millis' = '%d'" + + ")", + topic, + bootstraps, + format, + format, + LocalDateTime.parse("2023-03-10T14:00:00.000") + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli()); + tEnv.executeSql(createTableSql); + + // insert multiple values with timestamp starting from 2023-03-11 (which is past the bounded + // end timestamp) + final String insertValuesSql = + "INSERT INTO upsert_kafka\n" + + "VALUES\n" + + " (1, TIMESTAMP '2023-03-11 08:10:10.666', 100, 'payload 1'),\n" + + " (2, TIMESTAMP '2023-03-12 13:12:11.123', 101, 'payload 2'),\n" + + " (1, TIMESTAMP '2023-03-13 12:09:50.321', 100, 'payload 1-new'),\n" + + " (2, TIMESTAMP '2023-03-14 17:15:13.457', 101, 'payload 2-new')"; + tEnv.executeSql(insertValuesSql).await(); + + // results should be empty + final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka")); + assertThat(results).satisfies(matching(deepEqualTo(Collections.emptyList(), true))); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + private void wordCountToUpsertKafka(String wordCountTable) throws Exception { String bootstraps = getBootstrapServers();
