Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2509#discussion_r101045081
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -438,6 +439,215 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+ /**
+ * This test ensures that when explicitly set to start from earliest
record, the consumer
+ * ignores the "auto.offset.reset" behaviour as well as any committed
group offsets in Kafka.
+ */
+ public void runStartFromEarliestOffsets() throws Exception {
+ // 3 partitions with 50 records each (0-49, so the expected
commit offset of each partition should be 50)
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ final String topicName =
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition,
parallelism, 1);
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.setParallelism(parallelism);
+
+ Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.offset.reset", "latest"); // this
should be ignored
+
+ // the committed offsets should be ignored
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler(standardProps);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+ readSequence(env, StartupMode.EARLIEST, readProps, parallelism,
topicName, recordsInEachPartition, 0);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * This test ensures that when explicitly set to start from latest
record, the consumer
+ * ignores the "auto.offset.reset" behaviour as well as any committed
group offsets in Kafka.
+ */
+ public void runStartFromLatestOffsets() throws Exception {
+ // 50 records written to each of 3 partitions before launching
a latest-starting consuming job
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ // each partition will be written an extra 200 records
+ final int extraRecordsInEachPartition = 200;
+
+ // all already existing data in the topic, before the consuming
topology has started, should be ignored
+ final String topicName =
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition,
parallelism, 1);
+
+ // the committed offsets should be ignored
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler(standardProps);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+ // job names for the topologies for writing and consuming the
extra records
+ final String consumeExtraRecordsJobName = "Consume Extra
Records Job";
+ final String writeExtraRecordsJobName = "Write Extra Records
Job";
+
+ // seriliazation / deserialization schemas for writing and
consuming the extra records
+ final TypeInformation<Tuple2<Integer, Integer>> resultType =
+ TypeInformation.of(new TypeHint<Tuple2<Integer,
Integer>>() {});
+
+ final KeyedSerializationSchema<Tuple2<Integer, Integer>>
serSchema =
+ new KeyedSerializationSchemaWrapper<>(
+ new
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ final KeyedDeserializationSchema<Tuple2<Integer, Integer>>
deserSchema =
+ new KeyedDeserializationSchemaWrapper<>(
+ new
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ // setup and run the latest-consuming job
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.setParallelism(parallelism);
+
+ final Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.offset.reset", "earliest"); // this
should be ignored
+
+ FlinkKafkaConsumerBase<Tuple2<Integer, Integer>>
latestReadingConsumer =
+ kafkaServer.getConsumer(topicName, deserSchema,
readProps);
+ latestReadingConsumer.setStartFromLatest();
+
+ env
+
.addSource(latestReadingConsumer).setParallelism(parallelism)
+ .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>,
Object>() {
+ @Override
+ public void flatMap(Tuple2<Integer, Integer>
value, Collector<Object> out) throws Exception {
+ if (value.f1 - recordsInEachPartition <
0) {
+ throw new
RuntimeException("test failed; consumed a record that was previously written: "
+ value);
+ }
+ }
+ }).setParallelism(1)
+ .addSink(new DiscardingSink<>());
+
+ final AtomicReference<Throwable> error = new
AtomicReference<>();
+ Thread consumeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ env.execute(consumeExtraRecordsJobName);
+ } catch (Throwable t) {
+ if (!(t.getCause() instanceof
JobCancellationException))
+ error.set(t);
--- End diff --
As per the unwritten Flink styleguide, we are always using {} after an if().
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---