[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561914#comment-15561914 ]
ASF GitHub Bot commented on FLINK-4723: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2580#discussion_r82577343 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -208,6 +207,235 @@ public void runFailOnNoBrokerTest() throws Exception { } } } + + /** + * Ensures that the committed offsets to Kafka are the offsets of "the next record to process" + */ + public void runCommitOffsetsToKafka() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(200); + + DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)); + stream.addSink(new DiscardingSink<String>()); + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + final Thread runner = new Thread("runner") { + @Override + public void run() { + try { + env.execute(); + } + catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) { + errorRef.set(t); + } + } + } + }; + runner.start(); + + final Long l50 = 50L; // the final committed offset in Kafka should be 50 + final long deadline = 30000 + System.currentTimeMillis(); + + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + + do { + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + + if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) { + break; + } + + Thread.sleep(100); + } + while (System.currentTimeMillis() < deadline); + + // cancel the job + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + + final Throwable t = errorRef.get(); + if (t != null) { + throw new RuntimeException("Job failed with an exception", t); + } + + // final check to see if offsets are correctly in Kafka + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + Assert.assertEquals(Long.valueOf(50L), o1); + Assert.assertEquals(Long.valueOf(50L), o2); + Assert.assertEquals(Long.valueOf(50L), o3); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test first writes a total of 200 records to a test topic, reads the first 100 so that some offsets are + * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets. + * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up + * and starts at the correct position. + */ + public void runStartFromKafkaCommitOffsets() throws Exception { + final int parallelism = 3; + final int recordsInEachPartition = 300; + + final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.getConfig().disableSysoutLogging(); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.setParallelism(parallelism); + env1.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets + + env1 + .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) + .map(new ThrottledMapper<String>(50)) + .map(new MapFunction<String, Object>() { + int count = 0; + @Override + public Object map(String value) throws Exception { + count++; + if (count == 150) { + throw new SuccessException(); + } + return null; + } + }) + .addSink(new DiscardingSink<>()); + + tryExecute(env1, "Read some records to commit offsets to Kafka"); + + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + + Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); + Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); + Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); + + LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3); --- End diff -- I wonder whether it makes sense to check that at least one of o1, o2 and o3 is not 300. If they are all 300 below test > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > --------------------------------------------------------------------------------- > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)