This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fd7e67f [FLINK-24697][flink-connectors-kafka] add auto.offset.reset
configuration for group-offsets startup mode
fd7e67f is described below
commit fd7e67f5509fc8f71892ac0ca8449d196f6254bc
Author: Hang Ruan <[email protected]>
AuthorDate: Fri Oct 29 16:11:23 2021 +0800
[FLINK-24697][flink-connectors-kafka] add auto.offset.reset configuration
for group-offsets startup mode
---
.../connectors/kafka/table/KafkaDynamicSource.java | 28 +-
.../kafka/table/KafkaDynamicTableFactoryTest.java | 329 +++++++++++++--------
.../connectors/kafka/table/KafkaTableITCase.java | 150 ++++++++++
.../kafka/table/KafkaTableTestUtils.java | 28 +-
.../flink/core/testutils/FlinkAssertions.java | 41 +++
5 files changed, 430 insertions(+), 146 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index f26b803..4946671 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -50,7 +50,9 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
@@ -62,6 +64,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -393,7 +396,13 @@ public class KafkaDynamicSource
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
-
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
+ String offsetResetConfig =
+ properties.getProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.NONE.name());
+ OffsetResetStrategy offsetResetStrategy =
getResetStrategy(offsetResetConfig);
+ kafkaSourceBuilder.setStartingOffsets(
+
OffsetsInitializer.committedOffsets(offsetResetStrategy));
break;
case SPECIFIC_OFFSETS:
Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -417,6 +426,23 @@ public class KafkaDynamicSource
return kafkaSourceBuilder.build();
}
+ private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
+ return Arrays.stream(OffsetResetStrategy.values())
+ .filter(ors ->
ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
+ .findAny()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ "%s can not be set to %s.
Valid values: [%s]",
+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ offsetResetConfig,
+
Arrays.stream(OffsetResetStrategy.values())
+ .map(Enum::name)
+
.map(String::toLowerCase)
+
.collect(Collectors.joining(",")))));
+ }
+
private KafkaDeserializationSchema<RowData>
createKafkaDeserializationSchema(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index f21aa06..9cdce6d 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -68,15 +68,16 @@ import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.assertj.core.api.Assertions;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.NullSource;
+import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
@@ -91,22 +92,19 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkAssertions.containsCause;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Abstract test base for {@link KafkaDynamicTableFactory}. */
-public class KafkaDynamicTableFactoryTest extends TestLogger {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaDynamicTableFactoryTest {
private static final String TOPIC = "myTopic";
private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3";
@@ -216,7 +214,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
-
Assertions.assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+ assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -259,7 +257,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
specificOffsets,
0);
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource)
actualSource;
- assertEquals(actualKafkaSource, expectedKafkaSource);
+ assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -300,7 +298,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
Collections.emptyMap(),
0);
- assertEquals(actualSource, expectedKafkaSource);
+ assertThat(actualSource).isEqualTo(expectedKafkaSource);
}
@Test
@@ -353,7 +351,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
expectedKafkaSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedKafkaSource.metadataKeys =
Collections.singletonList("timestamp");
- assertEquals(actualSource, expectedKafkaSource);
+ assertThat(actualSource).isEqualTo(expectedKafkaSource);
}
@Test
@@ -363,22 +361,73 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
getBasicSourceOptions(), options ->
options.remove("properties.group.id"));
final DynamicTableSource tableSource = createTableSource(SCHEMA,
modifiedOptions);
- assertThat(tableSource, instanceOf(KafkaDynamicSource.class));
+ assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
ScanTableSource.ScanRuntimeProvider providerWithoutGroupId =
((KafkaDynamicSource) tableSource)
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
- assertThat(providerWithoutGroupId,
instanceOf(DataStreamScanProvider.class));
+
assertThat(providerWithoutGroupId).isInstanceOf(DataStreamScanProvider.class);
final KafkaSource<?> kafkaSource =
assertKafkaSource(providerWithoutGroupId);
final Configuration configuration =
KafkaSourceTestUtils.getKafkaSourceConfiguration(kafkaSource);
// Test offset commit on checkpoint should be disabled when do not set
consumer group.
-
assertFalse(configuration.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
- assertFalse(
- configuration.get(
-
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
- .booleanType()
- .noDefaultValue()));
+
assertThat(configuration.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)).isFalse();
+ assertThat(
+ configuration.get(
+
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ .booleanType()
+ .noDefaultValue()))
+ .isFalse();
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"none", "earliest", "latest"})
+ @NullSource
+ public void testTableSourceSetOffsetReset(final String strategyName) {
+ testSetOffsetResetForStartFromGroupOffsets(strategyName);
+ }
+
+ @Test
+ public void testTableSourceSetOffsetResetWithException() {
+ String errorStrategy = "errorStrategy";
+ assertThatThrownBy(() -> testTableSourceSetOffsetReset(errorStrategy))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ String.format(
+ "%s can not be set to %s. Valid values:
[latest,earliest,none]",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
errorStrategy));
+ }
+
+ private void testSetOffsetResetForStartFromGroupOffsets(String value) {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options -> {
+ options.remove("scan.startup.mode");
+ if (value == null) {
+ return;
+ }
+ options.put(
+ PROPERTIES_PREFIX +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ value);
+ });
+ final DynamicTableSource tableSource = createTableSource(SCHEMA,
modifiedOptions);
+ assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
+ ScanTableSource.ScanRuntimeProvider provider =
+ ((KafkaDynamicSource) tableSource)
+
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+ final KafkaSource<?> kafkaSource = assertKafkaSource(provider);
+ final Configuration configuration =
+ KafkaSourceTestUtils.getKafkaSourceConfiguration(kafkaSource);
+
+ if (value == null) {
+
assertThat(configuration.toMap().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+ .isEqualTo("none");
+ } else {
+
assertThat(configuration.toMap().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+ .isEqualTo(value);
+ }
}
@Test
@@ -409,16 +458,16 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
DeliveryGuarantee.EXACTLY_ONCE,
null,
"kafka-sink");
- assertEquals(expectedSink, actualSink);
+ assertThat(actualSink).isEqualTo(expectedSink);
// Test kafka producer.
final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
DynamicTableSink.SinkRuntimeProvider provider =
actualKafkaSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
- assertThat(provider, instanceOf(SinkProvider.class));
+ assertThat(provider).isInstanceOf(SinkProvider.class);
final SinkProvider sinkProvider = (SinkProvider) provider;
final Sink<RowData, ?, ?, ?> sinkFunction = sinkProvider.createSink();
- assertThat(sinkFunction, instanceOf(KafkaSink.class));
+ assertThat(sinkFunction).isInstanceOf(KafkaSink.class);
}
@Test
@@ -449,7 +498,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")),
null,
"kafka-sink");
- assertEquals(expectedSink, actualSink);
+ assertThat(actualSink).isEqualTo(expectedSink);
}
}
@@ -493,7 +542,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
null,
"kafka-sink");
- assertEquals(expectedSink, actualSink);
+ assertThat(actualSink).isEqualTo(expectedSink);
}
@Test
@@ -520,14 +569,14 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
DeliveryGuarantee.EXACTLY_ONCE,
100,
"kafka-sink");
- assertEquals(expectedSink, actualSink);
+ assertThat(actualSink).isEqualTo(expectedSink);
final DynamicTableSink.SinkRuntimeProvider provider =
actualSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
- assertThat(provider, instanceOf(SinkProvider.class));
+ assertThat(provider).isInstanceOf(SinkProvider.class);
final SinkProvider sinkProvider = (SinkProvider) provider;
- assertTrue(sinkProvider.getParallelism().isPresent());
- assertEquals(100, (long) sinkProvider.getParallelism().get());
+ assertThat(sinkProvider.getParallelism().isPresent()).isTrue();
+ assertThat((long) sinkProvider.getParallelism().get()).isEqualTo(100);
}
@Test
@@ -642,7 +691,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
} else {
expectedValueEncoder = createDebeziumAvroSerSchema(rowType,
expectedValueSubject);
}
- assertEquals(expectedValueEncoder, actualValueEncoder);
+ assertThat(actualValueEncoder).isEqualTo(expectedValueEncoder);
}
if (avroFormats.contains(keyFormat)) {
@@ -656,7 +705,7 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
} else {
expectedKeyEncoder = createDebeziumAvroSerSchema(rowType,
expectedKeySubject);
}
- assertEquals(expectedKeyEncoder, actualKeyEncoder);
+ assertThat(actualKeyEncoder).isEqualTo(expectedKeyEncoder);
}
}
@@ -680,106 +729,129 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
@Test
public void testSourceTableWithTopicAndTopicPattern() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "Option 'topic' and 'topic-pattern' shouldn't
be set together.")));
-
- final Map<String, String> modifiedOptions =
- getModifiedOptions(
- getBasicSourceOptions(),
- options -> {
- options.put("topic", TOPICS);
- options.put("topic-pattern", TOPIC_REGEX);
- });
-
- createTableSource(SCHEMA, modifiedOptions);
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options -> {
+ options.put("topic", TOPICS);
+ options.put("topic-pattern",
TOPIC_REGEX);
+ });
+
+ createTableSource(SCHEMA, modifiedOptions);
+ })
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ containsCause(
+ new ValidationException(
+ "Option 'topic' and 'topic-pattern'
shouldn't be set together.")));
}
@Test
public void testMissingStartupTimestamp() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "'scan.startup.timestamp-millis' "
- + "is required in 'timestamp' startup
mode but missing.")));
-
- final Map<String, String> modifiedOptions =
- getModifiedOptions(
- getBasicSourceOptions(),
- options -> options.put("scan.startup.mode",
"timestamp"));
-
- createTableSource(SCHEMA, modifiedOptions);
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options ->
+
options.put("scan.startup.mode", "timestamp"));
+
+ createTableSource(SCHEMA, modifiedOptions);
+ })
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ containsCause(
+ new ValidationException(
+ "'scan.startup.timestamp-millis' "
+ + "is required in 'timestamp'
startup mode but missing.")));
}
@Test
public void testMissingSpecificOffsets() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "'scan.startup.specific-offsets' "
- + "is required in 'specific-offsets'
startup mode but missing.")));
-
- final Map<String, String> modifiedOptions =
- getModifiedOptions(
- getBasicSourceOptions(),
- options ->
options.remove("scan.startup.specific-offsets"));
-
- createTableSource(SCHEMA, modifiedOptions);
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options ->
+ options.remove(
+
"scan.startup.specific-offsets"));
+
+ createTableSource(SCHEMA, modifiedOptions);
+ })
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ containsCause(
+ new ValidationException(
+ "'scan.startup.specific-offsets' "
+ + "is required in
'specific-offsets' startup mode but missing.")));
}
@Test
public void testInvalidSinkPartitioner() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "Could not find and instantiate partitioner "
+ "class 'abc'")));
-
- final Map<String, String> modifiedOptions =
- getModifiedOptions(
- getBasicSinkOptions(), options ->
options.put("sink.partitioner", "abc"));
-
- createTableSink(SCHEMA, modifiedOptions);
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSinkOptions(),
+ options ->
options.put("sink.partitioner", "abc"));
+
+ createTableSink(SCHEMA, modifiedOptions);
+ })
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ containsCause(
+ new ValidationException(
+ "Could not find and instantiate
partitioner "
+ + "class 'abc'")));
}
@Test
public void testInvalidRoundRobinPartitionerWithKeyFields() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "Currently 'round-robin' partitioner only
works "
- + "when option 'key.fields' is not
specified.")));
-
- final Map<String, String> modifiedOptions =
- getModifiedOptions(
- getKeyValueOptions(),
- options -> options.put("sink.partitioner",
"round-robin"));
-
- createTableSink(SCHEMA, modifiedOptions);
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getKeyValueOptions(),
+ options ->
+
options.put("sink.partitioner", "round-robin"));
+
+ createTableSink(SCHEMA, modifiedOptions);
+ })
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ containsCause(
+ new ValidationException(
+ "Currently 'round-robin' partitioner
only works "
+ + "when option 'key.fields' is
not specified.")));
}
@Test
public void testExactlyOnceGuaranteeWithoutTransactionalIdPrefix() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "sink.transactional-id-prefix must be
specified when using DeliveryGuarantee.EXACTLY_ONCE.")));
- final Map<String, String> modifiedOptions =
- getModifiedOptions(
- getKeyValueOptions(),
- options -> {
-
options.remove(KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX.key());
- options.put(
-
KafkaConnectorOptions.DELIVERY_GUARANTEE.key(),
- DeliveryGuarantee.EXACTLY_ONCE.toString());
- });
- createTableSink(SCHEMA, modifiedOptions);
+ assertThatThrownBy(
+ () -> {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getKeyValueOptions(),
+ options -> {
+ options.remove(
+ KafkaConnectorOptions
+
.TRANSACTIONAL_ID_PREFIX
+ .key());
+ options.put(
+
KafkaConnectorOptions.DELIVERY_GUARANTEE
+ .key(),
+
DeliveryGuarantee.EXACTLY_ONCE.toString());
+ });
+ createTableSink(SCHEMA, modifiedOptions);
+ })
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ containsCause(
+ new ValidationException(
+ "sink.transactional-id-prefix must be
specified when using DeliveryGuarantee.EXACTLY_ONCE.")));
}
@Test
@@ -798,12 +870,12 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
try {
createTableSink(SCHEMA, modifiedOptions);
} catch (Throwable t) {
- assertEquals(
- String.format(
- errorMessageTemp,
- "'topic'",
- String.format("[%s]", String.join(", ",
TOPIC_LIST))),
- t.getCause().getMessage());
+ assertThat(t.getCause().getMessage())
+ .isEqualTo(
+ String.format(
+ errorMessageTemp,
+ "'topic'",
+ String.format("[%s]", String.join(", ",
TOPIC_LIST))));
}
modifiedOptions =
@@ -814,9 +886,8 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
try {
createTableSink(SCHEMA, modifiedOptions);
} catch (Throwable t) {
- assertEquals(
- String.format(errorMessageTemp, "'topic-pattern'",
TOPIC_REGEX),
- t.getCause().getMessage());
+ assertThat(t.getCause().getMessage())
+ .isEqualTo(String.format(errorMessageTemp,
"'topic-pattern'", TOPIC_REGEX));
}
}
@@ -1032,18 +1103,18 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
}
private KafkaSource<?>
assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
- assertThat(provider, instanceOf(DataStreamScanProvider.class));
+ assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
final DataStreamScanProvider dataStreamScanProvider =
(DataStreamScanProvider) provider;
final Transformation<RowData> transformation =
dataStreamScanProvider
.produceDataStream(StreamExecutionEnvironment.createLocalEnvironment())
.getTransformation();
- assertThat(transformation, instanceOf(SourceTransformation.class));
+ assertThat(transformation).isInstanceOf(SourceTransformation.class);
SourceTransformation<RowData, KafkaPartitionSplit,
KafkaSourceEnumState>
sourceTransformation =
(SourceTransformation<RowData, KafkaPartitionSplit,
KafkaSourceEnumState>)
transformation;
- assertThat(sourceTransformation.getSource(),
instanceOf(KafkaSource.class));
+
assertThat(sourceTransformation.getSource()).isInstanceOf(KafkaSource.class);
return (KafkaSource<?>) sourceTransformation.getSource();
}
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index e2a4305..0dfd890 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -27,6 +28,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.types.Row;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,6 +45,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -821,6 +826,151 @@ public class KafkaTableITCase extends KafkaTableTestBase {
deleteTestTopic(topic);
}
+ @Test
+ public void testStartFromGroupOffsetsLatest() throws Exception {
+ testStartFromGroupOffsets("latest");
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsEarliest() throws Exception {
+ testStartFromGroupOffsets("earliest");
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsNone() {
+ Assertions.assertThatThrownBy(() ->
testStartFromGroupOffsetsWithNoneResetStrategy())
+
.satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class));
+ }
+
+ private List<String> appendNewData(String tableName)
+ throws ExecutionException, InterruptedException {
+ String appendValues =
+ "INSERT INTO "
+ + tableName
+ + "\n"
+ + "VALUES\n"
+ + " (2, 6),\n"
+ + " (2, 7),\n"
+ + " (2, 8)\n";
+ tEnv.executeSql(appendValues).await();
+ return Arrays.asList("+I[2, 6]", "+I[2, 7]", "+I[2, 8]");
+ }
+
+ private TableResult startFromGroupOffset(
+ String tableName, String topic, String groupId, String
resetStrategy, String sinkName)
+ throws ExecutionException, InterruptedException {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ createTestTopic(topic, 4, 1);
+
+ // ---------- Produce an event time stream into Kafka
-------------------
+ String bootstraps = getBootstrapServers();
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
+
+ final String createTableSql =
+ "CREATE TABLE %s (\n"
+ + " `partition_id` INT,\n"
+ + " `value` INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' = 'group-offsets',\n"
+ + " 'properties.auto.offset.reset' = '%s',\n"
+ + " 'format' = '%s'\n"
+ + ")";
+ tEnv.executeSql(
+ String.format(
+ createTableSql,
+ tableName,
+ topic,
+ bootstraps,
+ groupId,
+ resetStrategy,
+ format));
+
+ String initialValues =
+ "INSERT INTO "
+ + tableName
+ + "\n"
+ + "VALUES\n"
+ + " (0, 0),\n"
+ + " (0, 1),\n"
+ + " (0, 2),\n"
+ + " (1, 3),\n"
+ + " (1, 4),\n"
+ + " (1, 5)\n";
+ tEnv.executeSql(initialValues).await();
+
+ // ---------- Consume stream from Kafka -------------------
+
+ env.setParallelism(1);
+ String createSink =
+ "CREATE TABLE "
+ + sinkName
+ + "(\n"
+ + " `partition_id` INT,\n"
+ + " `value` INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values'\n"
+ + ")";
+ tEnv.executeSql(createSink);
+
+ return tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM " +
tableName);
+ }
+
+ private void testStartFromGroupOffsets(String resetStrategy) throws
Exception {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String tableName = "Table" + format + resetStrategy;
+ final String topic = "groupOffset_" + format + resetStrategy;
+ String groupId = format + resetStrategy;
+ String sinkName = "mySink" + format + resetStrategy;
+ List<String> expected =
+ Arrays.asList(
+ "+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 3]", "+I[1,
4]", "+I[1, 5]");
+
+ TableResult tableResult = null;
+ try {
+ tableResult = startFromGroupOffset(tableName, topic, groupId,
resetStrategy, sinkName);
+ if ("latest".equals(resetStrategy)) {
+ expected = appendNewData(tableName);
+ }
+ KafkaTableTestUtils.waitingExpectedResults(sinkName, expected,
Duration.ofSeconds(5));
+ } finally {
+ // ------------- cleanup -------------------
+ if (tableResult != null) {
+ tableResult.getJobClient().ifPresent(JobClient::cancel);
+ }
+ deleteTestTopic(topic);
+ }
+ }
+
+ private void testStartFromGroupOffsetsWithNoneResetStrategy()
+ throws ExecutionException, InterruptedException {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String resetStrategy = "none";
+ final String tableName = resetStrategy + "Table";
+ final String topic = "groupOffset_" + format;
+ String groupId = resetStrategy + (new Random()).nextInt();
+
+ TableResult tableResult = null;
+ try {
+ tableResult = startFromGroupOffset(tableName, topic, groupId,
resetStrategy, "MySink");
+ tableResult.await();
+ } finally {
+ // ------------- cleanup -------------------
+ if (tableResult != null) {
+ tableResult.getJobClient().ifPresent(JobClient::cancel);
+ }
+ deleteTestTopic(topic);
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
// Utilities
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
index f585fd4..bfc21d9 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.table;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
@@ -39,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -74,23 +76,17 @@ public class KafkaTableTestUtils {
}
public static void waitingExpectedResults(
- String sinkName, List<String> expected, Duration timeout) throws
InterruptedException {
- long now = System.currentTimeMillis();
- long stop = now + timeout.toMillis();
+ String sinkName, List<String> expected, Duration timeout)
+ throws InterruptedException, TimeoutException {
Collections.sort(expected);
- while (System.currentTimeMillis() < stop) {
- List<String> actual = TestValuesTableFactory.getResults(sinkName);
- Collections.sort(actual);
- if (expected.equals(actual)) {
- return;
- }
- Thread.sleep(100);
- }
-
- // timeout, assert again
- List<String> actual = TestValuesTableFactory.getResults(sinkName);
- Collections.sort(actual);
- assertEquals(expected, actual);
+ CommonTestUtils.waitUtil(
+ () -> {
+ List<String> actual =
TestValuesTableFactory.getResults(sinkName);
+ Collections.sort(actual);
+ return expected.equals(actual);
+ },
+ timeout,
+ "Can not get the expected result.");
}
public static void comparedWithKeyAndOrder(
diff --git
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
index e732bf0..0164610 100644
---
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
+++
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
@@ -65,6 +65,47 @@ public final class FlinkAssertions {
}
/**
+ * Shorthand to assert the chain of causes includes a specific {@link
Throwable}. Same as:
+ *
+ * <pre>{@code
+ * assertThatChainOfCauses(t)
+ * .anySatisfy(
+ * cause ->
+ * assertThat(cause)
+ * .isInstanceOf(throwable.getClass())
+ * .hasMessage(throwable.getMessage()));
+ * }</pre>
+ */
+ public static ThrowingConsumer<? super Throwable> containsCause(Throwable
throwable) {
+ return t ->
+ assertThatChainOfCauses(t)
+ .anySatisfy(
+ cause ->
+ assertThat(cause)
+
.isInstanceOf(throwable.getClass())
+
.hasMessage(throwable.getMessage()));
+ }
+
+ /**
+ * Shorthand to assert the chain of causes includes a {@link Throwable}
matching a specific
+ * {@link Class}. Same as:
+ *
+ * <pre>{@code
+ * assertThatChainOfCauses(throwable)
+ * .anySatisfy(
+ * cause ->
+ * assertThat(cause)
+ * .isInstanceOf(clazz));
+ * }</pre>
+ */
+ public static ThrowingConsumer<? super Throwable> anyCauseMatches(
+ Class<? extends Throwable> clazz) {
+ return t ->
+ assertThatChainOfCauses(t)
+ .anySatisfy(cause ->
assertThat(cause).isInstanceOf(clazz));
+ }
+
+ /**
* Shorthand to assert the chain of causes includes a {@link Throwable}
matching a specific
* {@link Class} and containing the provided message. Same as:
*