This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fd7e67f  [FLINK-24697][flink-connectors-kafka] add auto.offset.reset 
configuration for group-offsets startup mode
fd7e67f is described below

commit fd7e67f5509fc8f71892ac0ca8449d196f6254bc
Author: Hang Ruan <[email protected]>
AuthorDate: Fri Oct 29 16:11:23 2021 +0800

    [FLINK-24697][flink-connectors-kafka] add auto.offset.reset configuration 
for group-offsets startup mode
---
 .../connectors/kafka/table/KafkaDynamicSource.java |  28 +-
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 329 +++++++++++++--------
 .../connectors/kafka/table/KafkaTableITCase.java   | 150 ++++++++++
 .../kafka/table/KafkaTableTestUtils.java           |  28 +-
 .../flink/core/testutils/FlinkAssertions.java      |  41 +++
 5 files changed, 430 insertions(+), 146 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index f26b803..4946671 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -50,7 +50,9 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 
@@ -62,6 +64,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
@@ -393,7 +396,13 @@ public class KafkaDynamicSource
                 
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
                 break;
             case GROUP_OFFSETS:
-                
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
+                String offsetResetConfig =
+                        properties.getProperty(
+                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                OffsetResetStrategy.NONE.name());
+                OffsetResetStrategy offsetResetStrategy = 
getResetStrategy(offsetResetConfig);
+                kafkaSourceBuilder.setStartingOffsets(
+                        
OffsetsInitializer.committedOffsets(offsetResetStrategy));
                 break;
             case SPECIFIC_OFFSETS:
                 Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -417,6 +426,23 @@ public class KafkaDynamicSource
         return kafkaSourceBuilder.build();
     }
 
+    private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
+        return Arrays.stream(OffsetResetStrategy.values())
+                .filter(ors -> 
ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
+                .findAny()
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        String.format(
+                                                "%s can not be set to %s. 
Valid values: [%s]",
+                                                
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                                offsetResetConfig,
+                                                
Arrays.stream(OffsetResetStrategy.values())
+                                                        .map(Enum::name)
+                                                        
.map(String::toLowerCase)
+                                                        
.collect(Collectors.joining(",")))));
+    }
+
     private KafkaDeserializationSchema<RowData> 
createKafkaDeserializationSchema(
             DeserializationSchema<RowData> keyDeserialization,
             DeserializationSchema<RowData> valueDeserialization,
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index f21aa06..9cdce6d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -68,15 +68,16 @@ import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.assertj.core.api.Assertions;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
@@ -91,22 +92,19 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.regex.Pattern;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkAssertions.containsCause;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Abstract test base for {@link KafkaDynamicTableFactory}. */
-public class KafkaDynamicTableFactoryTest extends TestLogger {
-
-    @Rule public ExpectedException thrown = ExpectedException.none();
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaDynamicTableFactoryTest {
 
     private static final String TOPIC = "myTopic";
     private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3";
@@ -216,7 +214,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         StartupMode.SPECIFIC_OFFSETS,
                         specificOffsets,
                         0);
-        
Assertions.assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+        assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
 
         ScanTableSource.ScanRuntimeProvider provider =
                 
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -259,7 +257,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         specificOffsets,
                         0);
         final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) 
actualSource;
-        assertEquals(actualKafkaSource, expectedKafkaSource);
+        assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
 
         ScanTableSource.ScanRuntimeProvider provider =
                 
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -300,7 +298,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         Collections.emptyMap(),
                         0);
 
-        assertEquals(actualSource, expectedKafkaSource);
+        assertThat(actualSource).isEqualTo(expectedKafkaSource);
     }
 
     @Test
@@ -353,7 +351,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
         expectedKafkaSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedKafkaSource.metadataKeys = 
Collections.singletonList("timestamp");
 
-        assertEquals(actualSource, expectedKafkaSource);
+        assertThat(actualSource).isEqualTo(expectedKafkaSource);
     }
 
     @Test
@@ -363,22 +361,73 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         getBasicSourceOptions(), options -> 
options.remove("properties.group.id"));
         final DynamicTableSource tableSource = createTableSource(SCHEMA, 
modifiedOptions);
 
-        assertThat(tableSource, instanceOf(KafkaDynamicSource.class));
+        assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
         ScanTableSource.ScanRuntimeProvider providerWithoutGroupId =
                 ((KafkaDynamicSource) tableSource)
                         
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-        assertThat(providerWithoutGroupId, 
instanceOf(DataStreamScanProvider.class));
+        
assertThat(providerWithoutGroupId).isInstanceOf(DataStreamScanProvider.class);
         final KafkaSource<?> kafkaSource = 
assertKafkaSource(providerWithoutGroupId);
         final Configuration configuration =
                 KafkaSourceTestUtils.getKafkaSourceConfiguration(kafkaSource);
 
         // Test offset commit on checkpoint should be disabled when do not set 
consumer group.
-        
assertFalse(configuration.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
-        assertFalse(
-                configuration.get(
-                        
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
-                                .booleanType()
-                                .noDefaultValue()));
+        
assertThat(configuration.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)).isFalse();
+        assertThat(
+                        configuration.get(
+                                
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()))
+                .isFalse();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"none", "earliest", "latest"})
+    @NullSource
+    public void testTableSourceSetOffsetReset(final String strategyName) {
+        testSetOffsetResetForStartFromGroupOffsets(strategyName);
+    }
+
+    @Test
+    public void testTableSourceSetOffsetResetWithException() {
+        String errorStrategy = "errorStrategy";
+        assertThatThrownBy(() -> testTableSourceSetOffsetReset(errorStrategy))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        String.format(
+                                "%s can not be set to %s. Valid values: 
[latest,earliest,none]",
+                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
errorStrategy));
+    }
+
+    private void testSetOffsetResetForStartFromGroupOffsets(String value) {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSourceOptions(),
+                        options -> {
+                            options.remove("scan.startup.mode");
+                            if (value == null) {
+                                return;
+                            }
+                            options.put(
+                                    PROPERTIES_PREFIX + 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                    value);
+                        });
+        final DynamicTableSource tableSource = createTableSource(SCHEMA, 
modifiedOptions);
+        assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
+        ScanTableSource.ScanRuntimeProvider provider =
+                ((KafkaDynamicSource) tableSource)
+                        
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+        final KafkaSource<?> kafkaSource = assertKafkaSource(provider);
+        final Configuration configuration =
+                KafkaSourceTestUtils.getKafkaSourceConfiguration(kafkaSource);
+
+        if (value == null) {
+            
assertThat(configuration.toMap().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+                    .isEqualTo("none");
+        } else {
+            
assertThat(configuration.toMap().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+                    .isEqualTo(value);
+        }
     }
 
     @Test
@@ -409,16 +458,16 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         DeliveryGuarantee.EXACTLY_ONCE,
                         null,
                         "kafka-sink");
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
 
         // Test kafka producer.
         final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
         DynamicTableSink.SinkRuntimeProvider provider =
                 actualKafkaSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
-        assertThat(provider, instanceOf(SinkProvider.class));
+        assertThat(provider).isInstanceOf(SinkProvider.class);
         final SinkProvider sinkProvider = (SinkProvider) provider;
         final Sink<RowData, ?, ?, ?> sinkFunction = sinkProvider.createSink();
-        assertThat(sinkFunction, instanceOf(KafkaSink.class));
+        assertThat(sinkFunction).isInstanceOf(KafkaSink.class);
     }
 
     @Test
@@ -449,7 +498,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                             
DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")),
                             null,
                             "kafka-sink");
-            assertEquals(expectedSink, actualSink);
+            assertThat(actualSink).isEqualTo(expectedSink);
         }
     }
 
@@ -493,7 +542,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         null,
                         "kafka-sink");
 
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
     }
 
     @Test
@@ -520,14 +569,14 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                         DeliveryGuarantee.EXACTLY_ONCE,
                         100,
                         "kafka-sink");
-        assertEquals(expectedSink, actualSink);
+        assertThat(actualSink).isEqualTo(expectedSink);
 
         final DynamicTableSink.SinkRuntimeProvider provider =
                 actualSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
-        assertThat(provider, instanceOf(SinkProvider.class));
+        assertThat(provider).isInstanceOf(SinkProvider.class);
         final SinkProvider sinkProvider = (SinkProvider) provider;
-        assertTrue(sinkProvider.getParallelism().isPresent());
-        assertEquals(100, (long) sinkProvider.getParallelism().get());
+        assertThat(sinkProvider.getParallelism().isPresent()).isTrue();
+        assertThat((long) sinkProvider.getParallelism().get()).isEqualTo(100);
     }
 
     @Test
@@ -642,7 +691,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
             } else {
                 expectedValueEncoder = createDebeziumAvroSerSchema(rowType, 
expectedValueSubject);
             }
-            assertEquals(expectedValueEncoder, actualValueEncoder);
+            assertThat(actualValueEncoder).isEqualTo(expectedValueEncoder);
         }
 
         if (avroFormats.contains(keyFormat)) {
@@ -656,7 +705,7 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
             } else {
                 expectedKeyEncoder = createDebeziumAvroSerSchema(rowType, 
expectedKeySubject);
             }
-            assertEquals(expectedKeyEncoder, actualKeyEncoder);
+            assertThat(actualKeyEncoder).isEqualTo(expectedKeyEncoder);
         }
     }
 
@@ -680,106 +729,129 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
 
     @Test
     public void testSourceTableWithTopicAndTopicPattern() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "Option 'topic' and 'topic-pattern' shouldn't 
be set together.")));
-
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getBasicSourceOptions(),
-                        options -> {
-                            options.put("topic", TOPICS);
-                            options.put("topic-pattern", TOPIC_REGEX);
-                        });
-
-        createTableSource(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getBasicSourceOptions(),
+                                            options -> {
+                                                options.put("topic", TOPICS);
+                                                options.put("topic-pattern", 
TOPIC_REGEX);
+                                            });
+
+                            createTableSource(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        containsCause(
+                                new ValidationException(
+                                        "Option 'topic' and 'topic-pattern' 
shouldn't be set together.")));
     }
 
     @Test
     public void testMissingStartupTimestamp() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "'scan.startup.timestamp-millis' "
-                                        + "is required in 'timestamp' startup 
mode but missing.")));
-
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getBasicSourceOptions(),
-                        options -> options.put("scan.startup.mode", 
"timestamp"));
-
-        createTableSource(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getBasicSourceOptions(),
+                                            options ->
+                                                    
options.put("scan.startup.mode", "timestamp"));
+
+                            createTableSource(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        containsCause(
+                                new ValidationException(
+                                        "'scan.startup.timestamp-millis' "
+                                                + "is required in 'timestamp' 
startup mode but missing.")));
     }
 
     @Test
     public void testMissingSpecificOffsets() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "'scan.startup.specific-offsets' "
-                                        + "is required in 'specific-offsets' 
startup mode but missing.")));
-
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getBasicSourceOptions(),
-                        options -> 
options.remove("scan.startup.specific-offsets"));
-
-        createTableSource(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getBasicSourceOptions(),
+                                            options ->
+                                                    options.remove(
+                                                            
"scan.startup.specific-offsets"));
+
+                            createTableSource(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        containsCause(
+                                new ValidationException(
+                                        "'scan.startup.specific-offsets' "
+                                                + "is required in 
'specific-offsets' startup mode but missing.")));
     }
 
     @Test
     public void testInvalidSinkPartitioner() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "Could not find and instantiate partitioner " 
+ "class 'abc'")));
-
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getBasicSinkOptions(), options -> 
options.put("sink.partitioner", "abc"));
-
-        createTableSink(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getBasicSinkOptions(),
+                                            options -> 
options.put("sink.partitioner", "abc"));
+
+                            createTableSink(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        containsCause(
+                                new ValidationException(
+                                        "Could not find and instantiate 
partitioner "
+                                                + "class 'abc'")));
     }
 
     @Test
     public void testInvalidRoundRobinPartitionerWithKeyFields() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "Currently 'round-robin' partitioner only 
works "
-                                        + "when option 'key.fields' is not 
specified.")));
-
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getKeyValueOptions(),
-                        options -> options.put("sink.partitioner", 
"round-robin"));
-
-        createTableSink(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getKeyValueOptions(),
+                                            options ->
+                                                    
options.put("sink.partitioner", "round-robin"));
+
+                            createTableSink(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        containsCause(
+                                new ValidationException(
+                                        "Currently 'round-robin' partitioner 
only works "
+                                                + "when option 'key.fields' is 
not specified.")));
     }
 
     @Test
     public void testExactlyOnceGuaranteeWithoutTransactionalIdPrefix() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "sink.transactional-id-prefix must be 
specified when using DeliveryGuarantee.EXACTLY_ONCE.")));
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getKeyValueOptions(),
-                        options -> {
-                            
options.remove(KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX.key());
-                            options.put(
-                                    
KafkaConnectorOptions.DELIVERY_GUARANTEE.key(),
-                                    DeliveryGuarantee.EXACTLY_ONCE.toString());
-                        });
-        createTableSink(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getKeyValueOptions(),
+                                            options -> {
+                                                options.remove(
+                                                        KafkaConnectorOptions
+                                                                
.TRANSACTIONAL_ID_PREFIX
+                                                                .key());
+                                                options.put(
+                                                        
KafkaConnectorOptions.DELIVERY_GUARANTEE
+                                                                .key(),
+                                                        
DeliveryGuarantee.EXACTLY_ONCE.toString());
+                                            });
+                            createTableSink(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        containsCause(
+                                new ValidationException(
+                                        "sink.transactional-id-prefix must be 
specified when using DeliveryGuarantee.EXACTLY_ONCE.")));
     }
 
     @Test
@@ -798,12 +870,12 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
         try {
             createTableSink(SCHEMA, modifiedOptions);
         } catch (Throwable t) {
-            assertEquals(
-                    String.format(
-                            errorMessageTemp,
-                            "'topic'",
-                            String.format("[%s]", String.join(", ", 
TOPIC_LIST))),
-                    t.getCause().getMessage());
+            assertThat(t.getCause().getMessage())
+                    .isEqualTo(
+                            String.format(
+                                    errorMessageTemp,
+                                    "'topic'",
+                                    String.format("[%s]", String.join(", ", 
TOPIC_LIST))));
         }
 
         modifiedOptions =
@@ -814,9 +886,8 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
         try {
             createTableSink(SCHEMA, modifiedOptions);
         } catch (Throwable t) {
-            assertEquals(
-                    String.format(errorMessageTemp, "'topic-pattern'", 
TOPIC_REGEX),
-                    t.getCause().getMessage());
+            assertThat(t.getCause().getMessage())
+                    .isEqualTo(String.format(errorMessageTemp, 
"'topic-pattern'", TOPIC_REGEX));
         }
     }
 
@@ -1032,18 +1103,18 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
     }
 
     private KafkaSource<?> 
assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
-        assertThat(provider, instanceOf(DataStreamScanProvider.class));
+        assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
         final DataStreamScanProvider dataStreamScanProvider = 
(DataStreamScanProvider) provider;
         final Transformation<RowData> transformation =
                 dataStreamScanProvider
                         
.produceDataStream(StreamExecutionEnvironment.createLocalEnvironment())
                         .getTransformation();
-        assertThat(transformation, instanceOf(SourceTransformation.class));
+        assertThat(transformation).isInstanceOf(SourceTransformation.class);
         SourceTransformation<RowData, KafkaPartitionSplit, 
KafkaSourceEnumState>
                 sourceTransformation =
                         (SourceTransformation<RowData, KafkaPartitionSplit, 
KafkaSourceEnumState>)
                                 transformation;
-        assertThat(sourceTransformation.getSource(), 
instanceOf(KafkaSource.class));
+        
assertThat(sourceTransformation.getSource()).isInstanceOf(KafkaSource.class);
         return (KafkaSource<?>) sourceTransformation.getSource();
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index e2a4305..0dfd890 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -27,6 +28,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.types.Row;
 
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,6 +45,8 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -821,6 +826,151 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testStartFromGroupOffsetsLatest() throws Exception {
+        testStartFromGroupOffsets("latest");
+    }
+
+    @Test
+    public void testStartFromGroupOffsetsEarliest() throws Exception {
+        testStartFromGroupOffsets("earliest");
+    }
+
+    @Test
+    public void testStartFromGroupOffsetsNone() {
+        Assertions.assertThatThrownBy(() -> 
testStartFromGroupOffsetsWithNoneResetStrategy())
+                
.satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class));
+    }
+
+    private List<String> appendNewData(String tableName)
+            throws ExecutionException, InterruptedException {
+        String appendValues =
+                "INSERT INTO "
+                        + tableName
+                        + "\n"
+                        + "VALUES\n"
+                        + " (2, 6),\n"
+                        + " (2, 7),\n"
+                        + " (2, 8)\n";
+        tEnv.executeSql(appendValues).await();
+        return Arrays.asList("+I[2, 6]", "+I[2, 7]", "+I[2, 8]");
+    }
+
+    private TableResult startFromGroupOffset(
+            String tableName, String topic, String groupId, String 
resetStrategy, String sinkName)
+            throws ExecutionException, InterruptedException {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        createTestTopic(topic, 4, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String bootstraps = getBootstrapServers();
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
+
+        final String createTableSql =
+                "CREATE TABLE %s (\n"
+                        + "  `partition_id` INT,\n"
+                        + "  `value` INT\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'kafka',\n"
+                        + "  'topic' = '%s',\n"
+                        + "  'properties.bootstrap.servers' = '%s',\n"
+                        + "  'properties.group.id' = '%s',\n"
+                        + "  'scan.startup.mode' = 'group-offsets',\n"
+                        + "  'properties.auto.offset.reset' = '%s',\n"
+                        + "  'format' = '%s'\n"
+                        + ")";
+        tEnv.executeSql(
+                String.format(
+                        createTableSql,
+                        tableName,
+                        topic,
+                        bootstraps,
+                        groupId,
+                        resetStrategy,
+                        format));
+
+        String initialValues =
+                "INSERT INTO "
+                        + tableName
+                        + "\n"
+                        + "VALUES\n"
+                        + " (0, 0),\n"
+                        + " (0, 1),\n"
+                        + " (0, 2),\n"
+                        + " (1, 3),\n"
+                        + " (1, 4),\n"
+                        + " (1, 5)\n";
+        tEnv.executeSql(initialValues).await();
+
+        // ---------- Consume stream from Kafka -------------------
+
+        env.setParallelism(1);
+        String createSink =
+                "CREATE TABLE "
+                        + sinkName
+                        + "(\n"
+                        + "  `partition_id` INT,\n"
+                        + "  `value` INT\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values'\n"
+                        + ")";
+        tEnv.executeSql(createSink);
+
+        return tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM " + 
tableName);
+    }
+
+    private void testStartFromGroupOffsets(String resetStrategy) throws 
Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String tableName = "Table" + format + resetStrategy;
+        final String topic = "groupOffset_" + format + resetStrategy;
+        String groupId = format + resetStrategy;
+        String sinkName = "mySink" + format + resetStrategy;
+        List<String> expected =
+                Arrays.asList(
+                        "+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 3]", "+I[1, 
4]", "+I[1, 5]");
+
+        TableResult tableResult = null;
+        try {
+            tableResult = startFromGroupOffset(tableName, topic, groupId, 
resetStrategy, sinkName);
+            if ("latest".equals(resetStrategy)) {
+                expected = appendNewData(tableName);
+            }
+            KafkaTableTestUtils.waitingExpectedResults(sinkName, expected, 
Duration.ofSeconds(5));
+        } finally {
+            // ------------- cleanup -------------------
+            if (tableResult != null) {
+                tableResult.getJobClient().ifPresent(JobClient::cancel);
+            }
+            deleteTestTopic(topic);
+        }
+    }
+
+    private void testStartFromGroupOffsetsWithNoneResetStrategy()
+            throws ExecutionException, InterruptedException {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String resetStrategy = "none";
+        final String tableName = resetStrategy + "Table";
+        final String topic = "groupOffset_" + format;
+        String groupId = resetStrategy + (new Random()).nextInt();
+
+        TableResult tableResult = null;
+        try {
+            tableResult = startFromGroupOffset(tableName, topic, groupId, 
resetStrategy, "MySink");
+            tableResult.await();
+        } finally {
+            // ------------- cleanup -------------------
+            if (tableResult != null) {
+                tableResult.getJobClient().ifPresent(JobClient::cancel);
+            }
+            deleteTestTopic(topic);
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Utilities
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
index f585fd4..bfc21d9 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
@@ -39,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -74,23 +76,17 @@ public class KafkaTableTestUtils {
     }
 
     public static void waitingExpectedResults(
-            String sinkName, List<String> expected, Duration timeout) throws 
InterruptedException {
-        long now = System.currentTimeMillis();
-        long stop = now + timeout.toMillis();
+            String sinkName, List<String> expected, Duration timeout)
+            throws InterruptedException, TimeoutException {
         Collections.sort(expected);
-        while (System.currentTimeMillis() < stop) {
-            List<String> actual = TestValuesTableFactory.getResults(sinkName);
-            Collections.sort(actual);
-            if (expected.equals(actual)) {
-                return;
-            }
-            Thread.sleep(100);
-        }
-
-        // timeout, assert again
-        List<String> actual = TestValuesTableFactory.getResults(sinkName);
-        Collections.sort(actual);
-        assertEquals(expected, actual);
+        CommonTestUtils.waitUtil(
+                () -> {
+                    List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
+                    Collections.sort(actual);
+                    return expected.equals(actual);
+                },
+                timeout,
+                "Can not get the expected result.");
     }
 
     public static void comparedWithKeyAndOrder(
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
index e732bf0..0164610 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
@@ -65,6 +65,47 @@ public final class FlinkAssertions {
     }
 
     /**
+     * Shorthand to assert the chain of causes includes a specific {@link 
Throwable}. Same as:
+     *
+     * <pre>{@code
+     * assertThatChainOfCauses(t)
+     *     .anySatisfy(
+     *          cause ->
+     *              assertThat(cause)
+     *                  .isInstanceOf(throwable.getClass())
+     *                  .hasMessage(throwable.getMessage()));
+     * }</pre>
+     */
+    public static ThrowingConsumer<? super Throwable> containsCause(Throwable 
throwable) {
+        return t ->
+                assertThatChainOfCauses(t)
+                        .anySatisfy(
+                                cause ->
+                                        assertThat(cause)
+                                                
.isInstanceOf(throwable.getClass())
+                                                
.hasMessage(throwable.getMessage()));
+    }
+
+    /**
+     * Shorthand to assert the chain of causes includes a {@link Throwable} 
matching a specific
+     * {@link Class}. Same as:
+     *
+     * <pre>{@code
+     * assertThatChainOfCauses(throwable)
+     *     .anySatisfy(
+     *          cause ->
+     *              assertThat(cause)
+     *                  .isInstanceOf(clazz));
+     * }</pre>
+     */
+    public static ThrowingConsumer<? super Throwable> anyCauseMatches(
+            Class<? extends Throwable> clazz) {
+        return t ->
+                assertThatChainOfCauses(t)
+                        .anySatisfy(cause -> 
assertThat(cause).isInstanceOf(clazz));
+    }
+
+    /**
      * Shorthand to assert the chain of causes includes a {@link Throwable} 
matching a specific
      * {@link Class} and containing the provided message. Same as:
      *

Reply via email to