[ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561914#comment-15561914
 ] 

ASF GitHub Bot commented on FLINK-4723:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2580#discussion_r82577343
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -208,6 +207,235 @@ public void runFailOnNoBrokerTest() throws Exception {
                        }
                }
        }
    +
    +   /**
    +    * Ensures that the committed offsets to Kafka are the offsets of "the 
next record to process"
    +    */
    +   public void runCommitOffsetsToKafka() throws Exception {
    +           // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
    +           final int parallelism = 3;
    +           final int recordsInEachPartition = 50;
    +
    +           final String topicName = 
writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, 
parallelism, 1);
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +           
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +           env.setParallelism(parallelism);
    +           env.enableCheckpointing(200);
    +
    +           DataStream<String> stream = 
env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), 
standardProps));
    +           stream.addSink(new DiscardingSink<String>());
    +
    +           final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
    +           final Thread runner = new Thread("runner") {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   env.execute();
    +                           }
    +                           catch (Throwable t) {
    +                                   if (!(t.getCause() instanceof 
JobCancellationException)) {
    +                                           errorRef.set(t);
    +                                   }
    +                           }
    +                   }
    +           };
    +           runner.start();
    +
    +           final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
    +           final long deadline = 30000 + System.currentTimeMillis();
    +
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
    +
    +           do {
    +                   Long o1 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 0);
    +                   Long o2 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 1);
    +                   Long o3 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    +
    +                   if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) 
{
    +                           break;
    +                   }
    +
    +                   Thread.sleep(100);
    +           }
    +           while (System.currentTimeMillis() < deadline);
    +
    +           // cancel the job
    +           
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
    +
    +           final Throwable t = errorRef.get();
    +           if (t != null) {
    +                   throw new RuntimeException("Job failed with an 
exception", t);
    +           }
    +
    +           // final check to see if offsets are correctly in Kafka
    +           Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
    +           Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
    +           Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    +           Assert.assertEquals(Long.valueOf(50L), o1);
    +           Assert.assertEquals(Long.valueOf(50L), o2);
    +           Assert.assertEquals(Long.valueOf(50L), o3);
    +
    +           kafkaOffsetHandler.close();
    +           deleteTestTopic(topicName);
    +   }
    +
    +   /**
    +    * This test first writes a total of 200 records to a test topic, reads 
the first 100 so that some offsets are
    +    * committed to Kafka, and then startup the consumer again to read the 
remaining records starting from the committed offsets.
    +    * The test ensures that whatever offsets were committed to Kafka, the 
consumer correctly picks them up
    +    * and starts at the correct position.
    +    */
    +   public void runStartFromKafkaCommitOffsets() throws Exception {
    +           final int parallelism = 3;
    +           final int recordsInEachPartition = 300;
    +
    +           final String topicName = 
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
    +
    +           final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +           env1.getConfig().disableSysoutLogging();
    +           
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +           env1.setParallelism(parallelism);
    +           env1.enableCheckpointing(20); // fast checkpoints to make sure 
we commit some offsets
    +
    +           env1
    +                   .addSource(kafkaServer.getConsumer(topicName, new 
SimpleStringSchema(), standardProps))
    +                   .map(new ThrottledMapper<String>(50))
    +                   .map(new MapFunction<String, Object>() {
    +                           int count = 0;
    +                           @Override
    +                           public Object map(String value) throws 
Exception {
    +                                   count++;
    +                                   if (count == 150) {
    +                                           throw new SuccessException();
    +                                   }
    +                                   return null;
    +                           }
    +                   })
    +                   .addSink(new DiscardingSink<>());
    +
    +           tryExecute(env1, "Read some records to commit offsets to 
Kafka");
    +
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
    +
    +           Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
    +           Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
    +           Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    +
    +           LOG.info("Got final committed offsets from Kafka o1={}, o2={}, 
o3={}", o1, o2, o3);
    --- End diff --
    
    I wonder whether it makes sense to check that at least one of o1, o2 and o3 
is not 300. If they are all 300 below test 


> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-4723
>                 URL: https://issues.apache.org/jira/browse/FLINK-4723
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to