[
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561915#comment-15561915
]
ASF GitHub Bot commented on FLINK-4723:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2580#discussion_r82576069
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -208,6 +207,235 @@ public void runFailOnNoBrokerTest() throws Exception {
}
}
}
+
+ /**
+ * Ensures that the committed offsets to Kafka are the offsets of "the
next record to process"
+ */
+ public void runCommitOffsetsToKafka() throws Exception {
+ // 3 partitions with 50 records each (0-49, so the expected
commit offset of each partition should be 50)
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ final String topicName =
writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition,
parallelism, 1);
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200);
+
+ DataStream<String> stream =
env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(),
standardProps));
+ stream.addSink(new DiscardingSink<String>());
+
+ final AtomicReference<Throwable> errorRef = new
AtomicReference<>();
+ final Thread runner = new Thread("runner") {
+ @Override
+ public void run() {
+ try {
+ env.execute();
+ }
+ catch (Throwable t) {
+ if (!(t.getCause() instanceof
JobCancellationException)) {
+ errorRef.set(t);
+ }
+ }
+ }
+ };
+ runner.start();
+
+ final Long l50 = 50L; // the final committed offset in Kafka
should be 50
+ final long deadline = 30000 + System.currentTimeMillis();
+
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler(standardProps);
+
+ do {
+ Long o1 =
kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ Long o2 =
kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ Long o3 =
kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+ if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3))
{
+ break;
+ }
+
+ Thread.sleep(100);
+ }
+ while (System.currentTimeMillis() < deadline);
+
+ // cancel the job
+
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+ final Throwable t = errorRef.get();
+ if (t != null) {
+ throw new RuntimeException("Job failed with an
exception", t);
+ }
+
+ // final check to see if offsets are correctly in Kafka
+ Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+ Assert.assertEquals(Long.valueOf(50L), o1);
+ Assert.assertEquals(Long.valueOf(50L), o2);
+ Assert.assertEquals(Long.valueOf(50L), o3);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * This test first writes a total of 200 records to a test topic, reads
the first 100 so that some offsets are
--- End diff --
300, 150
> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9
> consumer
> ---------------------------------------------------------------------------------
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the
> committed offsets back to Kafka by the 0.9 by 1, so that the internal
> {{KafkaConsumer}} picks up the correct start position when committed offsets
> are present. This fix was required because the start position from committed
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs,
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka
> consumer to read the same topic in a non-Flink application, the first record
> will be duplicate (because, like described above, Kafka high-level consumers
> expect the committed offsets to be "the next record to process" and not "the
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be
> incremented by 1, and changing how Flink internal offsets are initialized
> with accordance to the acquired ZK offsets.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)