fapaul commented on a change in pull request #17601:
URL: https://github.com/apache/flink/pull/17601#discussion_r784796744
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -393,7 +396,27 @@ public int hashCode() {
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
-
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
+ String offsetResetConfig =
+ properties.getProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.NONE.name());
+ OffsetResetStrategy offsetResetStrategy =
+ Arrays.stream(OffsetResetStrategy.values())
+ .filter(
+ ors ->
+ ors.name()
+ .equals(
+
offsetResetConfig.toUpperCase(
+
Locale.ROOT)))
+ .findAny()
+ .orElse(null);
+ Preconditions.checkArgument(
Review comment:
Nit: you can immediately throw the `IllegalArgumentException` with
`.orElseThrow()`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -393,7 +396,27 @@ public int hashCode() {
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break;
case GROUP_OFFSETS:
-
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
+ String offsetResetConfig =
+ properties.getProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.NONE.name());
+ OffsetResetStrategy offsetResetStrategy =
+ Arrays.stream(OffsetResetStrategy.values())
+ .filter(
+ ors ->
+ ors.name()
+ .equals(
+
offsetResetConfig.toUpperCase(
+
Locale.ROOT)))
Review comment:
Nit: This is a bit hard to read maybe extract a method.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -821,6 +826,175 @@ public void testPerPartitionWatermarkWithIdleSource()
throws Exception {
deleteTestTopic(topic);
}
+ @Test
+ public void testStartFromGroupOffsetsLatest() throws Exception {
+ testStartFromGroupOffsets("latest", Collections.emptyList());
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsEarliest() throws Exception {
+ final List<String> expected =
+ Arrays.asList(
+ "+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 3]", "+I[1,
4]", "+I[1, 5]");
+ testStartFromGroupOffsets("earliest", expected);
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsNone() {
+ try {
+ testStartFromGroupOffsetsWithException("none",
Collections.emptyList());
+ fail("None offset reset error.");
+ } catch (Exception e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ assertTrue(rootCause instanceof NoOffsetForPartitionException);
+ }
+ }
+
+ private void testStartFromGroupOffsets(String reset, List<String> expected)
Review comment:
The two methods `testStartFromGroupOffset` and
`testStartFromGroupOffset` look like largely duplicated code. Can you extract
the common parts into a method?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -821,6 +826,175 @@ public void testPerPartitionWatermarkWithIdleSource()
throws Exception {
deleteTestTopic(topic);
}
+ @Test
+ public void testStartFromGroupOffsetsLatest() throws Exception {
+ testStartFromGroupOffsets("latest", Collections.emptyList());
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsEarliest() throws Exception {
+ final List<String> expected =
+ Arrays.asList(
+ "+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 3]", "+I[1,
4]", "+I[1, 5]");
+ testStartFromGroupOffsets("earliest", expected);
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsNone() {
+ try {
+ testStartFromGroupOffsetsWithException("none",
Collections.emptyList());
+ fail("None offset reset error.");
+ } catch (Exception e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ assertTrue(rootCause instanceof NoOffsetForPartitionException);
+ }
+ }
+
+ private void testStartFromGroupOffsets(String reset, List<String> expected)
+ throws ExecutionException, InterruptedException {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String tableName = "Table" + format + reset;
+ final String topic = "groupOffset_" + format + reset;
+ createTestTopic(topic, 4, 1);
+
+ // ---------- Produce an event time stream into Kafka
-------------------
+ String groupId = format + reset;
+ String bootstraps = getBootstrapServers();
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));
+
+ final String createTableSql =
+ "CREATE TABLE %s (\n"
+ + " `partition_id` INT,\n"
+ + " `value` INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' = 'group-offsets',\n"
+ + " 'properties.auto.offset.reset' = '%s',\n"
+ + " 'format' = '%s'\n"
+ + ")";
+ tEnv.executeSql(
+ String.format(
+ createTableSql, tableName, topic, bootstraps, groupId,
reset, format));
+
+ String initialValues =
+ "INSERT INTO "
+ + tableName
+ + "\n"
+ + "VALUES\n"
+ + " (0, 0),\n"
+ + " (0, 1),\n"
+ + " (0, 2),\n"
+ + " (1, 3),\n"
+ + " (1, 4),\n"
+ + " (1, 5)\n";
+ tEnv.executeSql(initialValues).await();
+
+ // ---------- Consume stream from Kafka -------------------
+
+ env.setParallelism(1);
+ String sinkName = "mySink" + format + reset;
+ String createSink =
+ "CREATE TABLE "
+ + sinkName
+ + "(\n"
+ + " `partition_id` INT,\n"
+ + " `value` INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values'\n"
+ + ")";
+ tEnv.executeSql(createSink);
+
+ TableResult tableResult =
+ tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM "
+ tableName);
+
+ if (expected.size() == 0) {
+ KafkaTableTestUtils.waitingEmptyResults(sinkName,
Duration.ofSeconds(5));
Review comment:
I am not sure if it is a good idea to introduce another test util for
waiting. We already have `CommonTestUtils#waitUntil` can you take a look at
that?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -380,6 +389,53 @@ public void testTableSourceCommitOnCheckpointDisabled() {
.noDefaultValue()));
}
+ @ParameterizedTest
+ @MethodSource("offsetResetStrategy")
+ public void testTableSourceSetOffsetReset(final String strategyName) {
+ testSetOffsetResetForStartFromGroupOffsets(strategyName);
+ }
+
+ @Test
+ public void testTableSourceSetOffsetResetWithException() {
+ String errorStrategy = "errorStrategy";
+ Assertions.assertThatThrownBy(() ->
testTableSourceSetOffsetReset(errorStrategy))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ String.format(
+ "%s can not be set to %s. Valid values:
[latest, earliest, none]",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
errorStrategy));
+ }
+
+ private void testSetOffsetResetForStartFromGroupOffsets(String value) {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options -> {
+ options.remove("scan.startup.mode");
+ if (value != null) {
+ options.put(
+ PROPERTIES_PREFIX +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ value);
+ }
+ });
+ final DynamicTableSource tableSource = createTableSource(SCHEMA,
modifiedOptions);
+ assertThat(tableSource, instanceOf(KafkaDynamicSource.class));
Review comment:
Please use for all the new assertions assertJ.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -380,6 +381,44 @@ public void testTableSourceCommitOnCheckpointDisabled() {
.noDefaultValue()));
}
+ @Test
+ public void testTableSourceSetOffsetReset() {
+ testSetOffsetResetForStartFromGroupOffsets(null);
+ testSetOffsetResetForStartFromGroupOffsets("none");
+ testSetOffsetResetForStartFromGroupOffsets("earliest");
+ testSetOffsetResetForStartFromGroupOffsets("latest");
+ }
+
+ private void testSetOffsetResetForStartFromGroupOffsets(String value) {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options -> {
+ options.remove("scan.startup.mode");
+ if (value != null) {
Review comment:
Nit: Personally, I am a big fan of handling all cases explicitly
```java
if (value == null) {
return;
}
...
```
it makes the code often more readable
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -380,6 +389,53 @@ public void testTableSourceCommitOnCheckpointDisabled() {
.noDefaultValue()));
}
+ @ParameterizedTest
Review comment:
Can you also quickly migrate all the other tests to use junit 5 because
mixing them in the same class causes problems? It would be nice to do it as a
separate commit.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -380,6 +389,53 @@ public void testTableSourceCommitOnCheckpointDisabled() {
.noDefaultValue()));
}
+ @ParameterizedTest
+ @MethodSource("offsetResetStrategy")
Review comment:
I do not think the method source is necessary here you can use the
following:
```java
@ValueSource(strings = {"none", "earliest", "latest"})
@NullSource
```
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -821,6 +826,175 @@ public void testPerPartitionWatermarkWithIdleSource()
throws Exception {
deleteTestTopic(topic);
}
+ @Test
+ public void testStartFromGroupOffsetsLatest() throws Exception {
+ testStartFromGroupOffsets("latest", Collections.emptyList());
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsEarliest() throws Exception {
+ final List<String> expected =
+ Arrays.asList(
+ "+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 3]", "+I[1,
4]", "+I[1, 5]");
+ testStartFromGroupOffsets("earliest", expected);
+ }
+
+ @Test
+ public void testStartFromGroupOffsetsNone() {
+ try {
+ testStartFromGroupOffsetsWithException("none",
Collections.emptyList());
+ fail("None offset reset error.");
+ } catch (Exception e) {
Review comment:
Why do you not only catch the `ExecutionException`? Currently afaict the
`fail` call is also caught and handeled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]