This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit fc31198d864ebafd36cf23ce302b6c1b4038b579
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Apr 5 14:49:12 2023 -0700

    [FLINK-31740] [upsert-kafka] Allow setting boundedness options for 
upsert-kafka
    
    This closes #22.
---
 .../kafka/table/KafkaConnectorOptionsUtil.java     |   2 +-
 .../table/UpsertKafkaDynamicTableFactory.java      |  19 +-
 .../table/UpsertKafkaDynamicTableFactoryTest.java  | 227 ++++++++++++++++++++-
 .../kafka/table/UpsertKafkaTableITCase.java        | 199 ++++++++++++++++++
 4 files changed, 441 insertions(+), 6 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index ef70644e..d6390e27 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -189,7 +189,7 @@ class KafkaConnectorOptionsUtil {
                         });
     }
 
-    private static void validateScanBoundedMode(ReadableConfig tableOptions) {
+    static void validateScanBoundedMode(ReadableConfig tableOptions) {
         tableOptions
                 .getOptional(SCAN_BOUNDED_MODE)
                 .ifPresent(
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index 254e1bf9..b9f2ea71 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -25,8 +25,8 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
-import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -57,6 +57,9 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
@@ -68,9 +71,11 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode;
 
 /** Upsert-Kafka factory. */
 public class UpsertKafkaDynamicTableFactory
@@ -101,6 +106,9 @@ public class UpsertKafkaDynamicTableFactory
         options.add(SINK_PARALLELISM);
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
         options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        options.add(SCAN_BOUNDED_MODE);
+        options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+        options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
         return options;
     }
 
@@ -129,6 +137,8 @@ public class UpsertKafkaDynamicTableFactory
         // always use earliest to keep data integrity
         StartupMode earliest = StartupMode.EARLIEST;
 
+        final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
+
         return new KafkaDynamicSource(
                 context.getPhysicalRowDataType(),
                 keyDecodingFormat,
@@ -142,9 +152,9 @@ public class UpsertKafkaDynamicTableFactory
                 earliest,
                 Collections.emptyMap(),
                 0,
-                BoundedMode.UNBOUNDED,
-                Collections.emptyMap(),
-                0,
+                boundedOptions.boundedMode,
+                boundedOptions.specificOffsets,
+                boundedOptions.boundedTimestampMillis,
                 true,
                 context.getObjectIdentifier().asSummaryString());
     }
@@ -228,6 +238,7 @@ public class UpsertKafkaDynamicTableFactory
             Format valueFormat,
             int[] primaryKeyIndexes) {
         validateTopic(tableOptions);
+        validateScanBoundedMode(tableOptions);
         validateFormat(keyFormat, valueFormat, tableOptions);
         validatePKConstraints(primaryKeyIndexes);
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 5caaaa0a..959c44cf 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
 import org.apache.flink.formats.avro.RowDataToAvroConverters;
@@ -64,23 +67,29 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link UpsertKafkaDynamicTableFactory}. */
 public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
@@ -390,6 +399,134 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                 RowDataToAvroConverters.createConverter(rowType));
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Bounded end-offset tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testBoundedSpecificOffsetsValidate() {
+        final Map<String, String> options = getFullSourceOptions();
+        options.put(
+                KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(),
+                ScanBoundedMode.SPECIFIC_OFFSETS.toString());
+
+        assertThatThrownBy(() -> createTableSource(SOURCE_SCHEMA, options))
+                .isInstanceOf(ValidationException.class)
+                .cause()
+                .hasMessageContaining(
+                        "'scan.bounded.specific-offsets' is required in 
'specific-offsets' bounded mode but missing.");
+    }
+
+    @Test
+    public void testBoundedSpecificOffsets() {
+        testBoundedOffsets(
+                ScanBoundedMode.SPECIFIC_OFFSETS,
+                options -> {
+                    options.put("scan.bounded.specific-offsets", 
"partition:0,offset:2");
+                },
+                source -> {
+                    
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new 
TopicPartition(SOURCE_TOPIC, 0);
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    
MockPartitionOffsetsRetriever.noInteractions());
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, 2L);
+                });
+    }
+
+    @Test
+    public void testBoundedLatestOffset() {
+        testBoundedOffsets(
+                ScanBoundedMode.LATEST_OFFSET,
+                options -> {},
+                source -> {
+                    
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new 
TopicPartition(SOURCE_TOPIC, 0);
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    
MockPartitionOffsetsRetriever.noInteractions());
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, 
KafkaPartitionSplit.LATEST_OFFSET);
+                });
+    }
+
+    @Test
+    public void testBoundedGroupOffsets() {
+        testBoundedOffsets(
+                ScanBoundedMode.GROUP_OFFSETS,
+                options -> {
+                    options.put("properties.group.id", "dummy");
+                },
+                source -> {
+                    
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new 
TopicPartition(SOURCE_TOPIC, 0);
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    
MockPartitionOffsetsRetriever.noInteractions());
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, 
KafkaPartitionSplit.COMMITTED_OFFSET);
+                });
+    }
+
+    @Test
+    public void testBoundedTimestamp() {
+        testBoundedOffsets(
+                ScanBoundedMode.TIMESTAMP,
+                options -> {
+                    options.put("scan.bounded.timestamp-millis", "1");
+                },
+                source -> {
+                    
assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new 
TopicPartition(SOURCE_TOPIC, 0);
+                    long offsetForTimestamp = 123L;
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    
MockPartitionOffsetsRetriever.timestampAndEnd(
+                                            partitions -> {
+                                                assertThat(partitions)
+                                                        
.containsOnlyKeys(partition)
+                                                        
.containsEntry(partition, 1L);
+                                                Map<TopicPartition, 
OffsetAndTimestamp> result =
+                                                        new HashMap<>();
+                                                result.put(
+                                                        partition,
+                                                        new OffsetAndTimestamp(
+                                                                
offsetForTimestamp, 1L));
+                                                return result;
+                                            },
+                                            partitions -> {
+                                                Map<TopicPartition, Long> 
result = new HashMap<>();
+                                                result.put(
+                                                        partition,
+                                                        // the end offset is 
bigger than given by
+                                                        // timestamp
+                                                        // to make sure the 
one for timestamp is
+                                                        // used
+                                                        offsetForTimestamp + 
1000L);
+                                                return result;
+                                            }));
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, offsetForTimestamp);
+                });
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Negative tests
     // 
--------------------------------------------------------------------------------------------
@@ -647,7 +784,7 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                 null);
     }
 
-    private void assertKafkaSource(ScanTableSource.ScanRuntimeProvider 
provider) {
+    private KafkaSource<?> 
assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
         assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
         final DataStreamScanProvider dataStreamScanProvider = 
(DataStreamScanProvider) provider;
         final Transformation<RowData> transformation =
@@ -662,5 +799,93 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         (SourceTransformation<RowData, KafkaPartitionSplit, 
KafkaSourceEnumState>)
                                 transformation;
         
assertThat(sourceTransformation.getSource()).isInstanceOf(KafkaSource.class);
+        return (KafkaSource<?>) sourceTransformation.getSource();
+    }
+
+    private void testBoundedOffsets(
+            ScanBoundedMode boundedMode,
+            Consumer<Map<String, String>> optionsConfig,
+            Consumer<KafkaSource<?>> validator) {
+        final Map<String, String> options = getFullSourceOptions();
+        options.put(KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), 
boundedMode.toString());
+        optionsConfig.accept(options);
+
+        final DynamicTableSource tableSource = 
createTableSource(SOURCE_SCHEMA, options);
+        assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
+        ScanTableSource.ScanRuntimeProvider provider =
+                ((KafkaDynamicSource) tableSource)
+                        
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+        final KafkaSource<?> kafkaSource = assertKafkaSource(provider);
+        validator.accept(kafkaSource);
+    }
+
+    private interface OffsetsRetriever
+            extends Function<Collection<TopicPartition>, Map<TopicPartition, 
Long>> {}
+
+    private interface TimestampOffsetsRetriever
+            extends Function<Map<TopicPartition, Long>, Map<TopicPartition, 
OffsetAndTimestamp>> {}
+
+    private static final class MockPartitionOffsetsRetriever
+            implements OffsetsInitializer.PartitionOffsetsRetriever {
+
+        public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL =
+                partitions -> {
+                    throw new UnsupportedOperationException(
+                            "The method was not supposed to be called");
+                };
+        private final OffsetsRetriever committedOffsets;
+        private final OffsetsRetriever endOffsets;
+        private final OffsetsRetriever beginningOffsets;
+        private final TimestampOffsetsRetriever offsetsForTimes;
+
+        static MockPartitionOffsetsRetriever noInteractions() {
+            return new MockPartitionOffsetsRetriever(
+                    UNSUPPORTED_RETRIEVAL,
+                    UNSUPPORTED_RETRIEVAL,
+                    UNSUPPORTED_RETRIEVAL,
+                    partitions -> {
+                        throw new UnsupportedOperationException(
+                                "The method was not supposed to be called");
+                    });
+        }
+
+        static MockPartitionOffsetsRetriever timestampAndEnd(
+                TimestampOffsetsRetriever retriever, OffsetsRetriever 
endOffsets) {
+            return new MockPartitionOffsetsRetriever(
+                    UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, 
retriever);
+        }
+
+        private MockPartitionOffsetsRetriever(
+                OffsetsRetriever committedOffsets,
+                OffsetsRetriever endOffsets,
+                OffsetsRetriever beginningOffsets,
+                TimestampOffsetsRetriever offsetsForTimes) {
+            this.committedOffsets = committedOffsets;
+            this.endOffsets = endOffsets;
+            this.beginningOffsets = beginningOffsets;
+            this.offsetsForTimes = offsetsForTimes;
+        }
+
+        @Override
+        public Map<TopicPartition, Long> 
committedOffsets(Collection<TopicPartition> partitions) {
+            return committedOffsets.apply(partitions);
+        }
+
+        @Override
+        public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
+            return endOffsets.apply(partitions);
+        }
+
+        @Override
+        public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
+            return beginningOffsets.apply(partitions);
+        }
+
+        @Override
+        public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
+                Map<TopicPartition, Long> timestampsToSearch) {
+            return offsetsForTimes.apply(timestampsToSearch);
+        }
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 109f4402..23f638f6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -33,17 +33,20 @@ import org.junit.runners.Parameterized;
 
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.typeinfo.Types.INT;
 import static org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
 import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED;
 import static org.apache.flink.api.common.typeinfo.Types.STRING;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.comparedWithKeyAndOrder;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults;
@@ -388,6 +391,202 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+        final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 
'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (1, 100, 'payload 1-new'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("-U", 1L, 100L, "payload 1"),
+                        changelogRow("+U", 1L, 100L, "payload 1-new"));
+        assertThat(results).satisfies(matching(deepEqualTo(expected, true)));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedTimestamp() throws 
Exception {
+        final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to timestamp 
2023-03-10T14:00:00.000
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `timestamp` TIMESTAMP(3) METADATA,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'timestamp',\n"
+                                + "  'scan.bounded.timestamp-millis' = '%d'"
+                                + ")",
+                        topic,
+                        bootstraps,
+                        format,
+                        format,
+                        LocalDateTime.parse("2023-03-10T14:00:00.000")
+                                .atZone(ZoneId.systemDefault())
+                                .toInstant()
+                                .toEpochMilli());
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values with timestamp starting from 2023-03-08 up 
to 2023-03-11
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, TIMESTAMP '2023-03-08 08:10:10.666', 100, 
'payload 1'),\n"
+                        + " (2, TIMESTAMP '2023-03-09 13:12:11.123', 101, 
'payload 2'),\n"
+                        + " (1, TIMESTAMP '2023-03-10 12:09:50.321', 100, 
'payload 1-new'),\n"
+                        + " (2, TIMESTAMP '2023-03-11 17:15:13.457', 101, 
'payload 2-new')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to timestamp 
2023-03-10T14:00:00.000
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow(
+                                "+I",
+                                1L,
+                                LocalDateTime.parse("2023-03-08T08:10:10.666"),
+                                100L,
+                                "payload 1"),
+                        changelogRow(
+                                "+I",
+                                2L,
+                                LocalDateTime.parse("2023-03-09T13:12:11.123"),
+                                101L,
+                                "payload 2"),
+                        changelogRow(
+                                "-U",
+                                1L,
+                                LocalDateTime.parse("2023-03-08T08:10:10.666"),
+                                100L,
+                                "payload 1"),
+                        changelogRow(
+                                "+U",
+                                1L,
+                                LocalDateTime.parse("2023-03-10T12:09:50.321"),
+                                100L,
+                                "payload 1-new"));
+        assertThat(results).satisfies(matching(deepEqualTo(expected, true)));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
+    /**
+     * Tests that setting bounded end offset that is before the earliest 
offset results in 0
+     * results.
+     */
+    @Test
+    public void testUpsertKafkaSourceSinkWithZeroLengthBoundedness() throws 
Exception {
+        final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to timestamp 
2023-03-10T14:00:00.000
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `timestamp` TIMESTAMP(3) METADATA,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'timestamp',\n"
+                                + "  'scan.bounded.timestamp-millis' = '%d'"
+                                + ")",
+                        topic,
+                        bootstraps,
+                        format,
+                        format,
+                        LocalDateTime.parse("2023-03-10T14:00:00.000")
+                                .atZone(ZoneId.systemDefault())
+                                .toInstant()
+                                .toEpochMilli());
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values with timestamp starting from 2023-03-11 
(which is past the bounded
+        // end timestamp)
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, TIMESTAMP '2023-03-11 08:10:10.666', 100, 
'payload 1'),\n"
+                        + " (2, TIMESTAMP '2023-03-12 13:12:11.123', 101, 
'payload 2'),\n"
+                        + " (1, TIMESTAMP '2023-03-13 12:09:50.321', 100, 
'payload 1-new'),\n"
+                        + " (2, TIMESTAMP '2023-03-14 17:15:13.457', 101, 
'payload 2-new')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should be empty
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+        
assertThat(results).satisfies(matching(deepEqualTo(Collections.emptyList(), 
true)));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
     private void wordCountToUpsertKafka(String wordCountTable) throws 
Exception {
         String bootstraps = getBootstrapServers();
 

Reply via email to