Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4190#discussion_r124961658
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws 
Exception {
     
                final String topicName = 
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
     
    -           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
    +           // read some records so that some offsets are committed to Kafka
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.getConfig().disableSysoutLogging();
    +           
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +           env.setParallelism(parallelism);
    +           env.enableCheckpointing(20); // fast checkpoints to make sure 
we commit some offsets
     
    -           Long o1;
    -           Long o2;
    -           Long o3;
    -           int attempt = 0;
    -           // make sure that o1, o2, o3 are not all null before proceeding
    -           do {
    -                   attempt++;
    -                   LOG.info("Attempt " + attempt + " to read records and 
commit some offsets to Kafka");
    -
    -                   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -                   env.getConfig().disableSysoutLogging();
    -                   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -                   env.setParallelism(parallelism);
    -                   env.enableCheckpointing(20); // fast checkpoints to 
make sure we commit some offsets
    -
    -                   env
    -                           .addSource(kafkaServer.getConsumer(topicName, 
new SimpleStringSchema(), standardProps))
    -                           .map(new ThrottledMapper<String>(consumePause))
    -                           .map(new MapFunction<String, Object>() {
    -                                   int count = 0;
    -                                   @Override
    -                                   public Object map(String value) throws 
Exception {
    -                                           count++;
    -                                           if (count == recordsToConsume) {
    -                                                   throw new 
SuccessException();
    -                                           }
    -                                           return null;
    +           env
    +                   .addSource(kafkaServer.getConsumer(topicName, new 
SimpleStringSchema(), standardProps))
    +                   .map(new ThrottledMapper<String>(consumePause))
    +                   .map(new MapFunction<String, Object>() {
    +                           int count = 0;
    +                           @Override
    +                           public Object map(String value) throws 
Exception {
    +                                   count++;
    +                                   if (count == recordsToConsume) {
    +                                           throw new SuccessException();
                                        }
    -                           })
    -                           .addSink(new DiscardingSink<>());
    +                                   return null;
    +                           }
    +                   })
    +                   .addSink(new DiscardingSink<>());
     
    -                   tryExecute(env, "Read some records to commit offsets to 
Kafka");
    +           tryExecute(env, "Read some records to commit offsets to Kafka");
     
    +           // make sure that we indeed have some offsets committed to Kafka
    +           Long o1 = null;
    +           Long o2 = null;
    +           Long o3 = null;
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
    +           while (o1 == null && o2 == null && o3 == null) {
    +                   Thread.sleep(100);
                        o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 
0);
                        o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 
1);
                        o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 
2);
    -           } while (o1 == null && o2 == null && o3 == null && attempt < 3);
    --- End diff --
    
    I understand the argument. Perhaps it is also a fact that this test is 
covering too much into one single test, hence the awkwardness in making it 
stable.
    I think it is sufficient to have 2 separate tests that replace this:
    (a) test that committed Kafka offsets are correct (there is already a 
ITCase for this)
    (b) test that committed offsets are correctly picked up and used correctly 
(there is actually also a test for this already).
    
    Hence, I would conclude that perhaps this test can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to