[
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15865857#comment-15865857
]
ASF GitHub Bot commented on FLINK-4280:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2509#discussion_r101045081
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -438,6 +439,215 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+ /**
+ * This test ensures that when explicitly set to start from earliest
record, the consumer
+ * ignores the "auto.offset.reset" behaviour as well as any committed
group offsets in Kafka.
+ */
+ public void runStartFromEarliestOffsets() throws Exception {
+ // 3 partitions with 50 records each (0-49, so the expected
commit offset of each partition should be 50)
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ final String topicName =
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition,
parallelism, 1);
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.setParallelism(parallelism);
+
+ Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.offset.reset", "latest"); // this
should be ignored
+
+ // the committed offsets should be ignored
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler(standardProps);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+ readSequence(env, StartupMode.EARLIEST, readProps, parallelism,
topicName, recordsInEachPartition, 0);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * This test ensures that when explicitly set to start from latest
record, the consumer
+ * ignores the "auto.offset.reset" behaviour as well as any committed
group offsets in Kafka.
+ */
+ public void runStartFromLatestOffsets() throws Exception {
+ // 50 records written to each of 3 partitions before launching
a latest-starting consuming job
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ // each partition will be written an extra 200 records
+ final int extraRecordsInEachPartition = 200;
+
+ // all already existing data in the topic, before the consuming
topology has started, should be ignored
+ final String topicName =
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition,
parallelism, 1);
+
+ // the committed offsets should be ignored
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler(standardProps);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+ kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+ // job names for the topologies for writing and consuming the
extra records
+ final String consumeExtraRecordsJobName = "Consume Extra
Records Job";
+ final String writeExtraRecordsJobName = "Write Extra Records
Job";
+
+ // seriliazation / deserialization schemas for writing and
consuming the extra records
+ final TypeInformation<Tuple2<Integer, Integer>> resultType =
+ TypeInformation.of(new TypeHint<Tuple2<Integer,
Integer>>() {});
+
+ final KeyedSerializationSchema<Tuple2<Integer, Integer>>
serSchema =
+ new KeyedSerializationSchemaWrapper<>(
+ new
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ final KeyedDeserializationSchema<Tuple2<Integer, Integer>>
deserSchema =
+ new KeyedDeserializationSchemaWrapper<>(
+ new
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ // setup and run the latest-consuming job
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.setParallelism(parallelism);
+
+ final Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.offset.reset", "earliest"); // this
should be ignored
+
+ FlinkKafkaConsumerBase<Tuple2<Integer, Integer>>
latestReadingConsumer =
+ kafkaServer.getConsumer(topicName, deserSchema,
readProps);
+ latestReadingConsumer.setStartFromLatest();
+
+ env
+
.addSource(latestReadingConsumer).setParallelism(parallelism)
+ .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>,
Object>() {
+ @Override
+ public void flatMap(Tuple2<Integer, Integer>
value, Collector<Object> out) throws Exception {
+ if (value.f1 - recordsInEachPartition <
0) {
+ throw new
RuntimeException("test failed; consumed a record that was previously written: "
+ value);
+ }
+ }
+ }).setParallelism(1)
+ .addSink(new DiscardingSink<>());
+
+ final AtomicReference<Throwable> error = new
AtomicReference<>();
+ Thread consumeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ env.execute(consumeExtraRecordsJobName);
+ } catch (Throwable t) {
+ if (!(t.getCause() instanceof
JobCancellationException))
+ error.set(t);
--- End diff --
As per the unwritten Flink styleguide, we are always using {} after an if().
> New Flink-specific option to set starting position of Kafka consumer without
> respecting external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in
> topics for the Flink Kafka consumer, users set the Kafka config
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if
> users were trying to find a way to "read topics from a starting position".
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer
> resembles Kafka's original intent for the setting: first, existing external
> offsets committed to the ZK / brokers will be checked; if none exists, then
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without
> taking into account the external offsets. The original behaviour (reference
> external offsets first) can be changed to be a user option, so that the
> behaviour can be retained for frequent Kafka users that may need some
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added,
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting
> position". As the Flink Kafka connector is somewhat essentially a
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add
> Flink-specific functionality that users will find useful, although it wasn't
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is
> used only to expose progress to the outside world, and not used to manipulate
> how Kafka topics are read in Flink (unless users opt to do so)" is even more
> definite and solid. There was some discussion in this PR
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I
> think adding this "decouples" more Flink's internal offset checkpointing from
> the external Kafka's offset store.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)