[ 
https://issues.apache.org/jira/browse/FLINK-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066416#comment-16066416
 ] 

ASF GitHub Bot commented on FLINK-7011:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4190#discussion_r124527723
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws 
Exception {
     
                final String topicName = 
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
     
    -           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
    +           // read some records so that some offsets are committed to Kafka
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.getConfig().disableSysoutLogging();
    +           
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +           env.setParallelism(parallelism);
    +           env.enableCheckpointing(20); // fast checkpoints to make sure 
we commit some offsets
     
    -           Long o1;
    -           Long o2;
    -           Long o3;
    -           int attempt = 0;
    -           // make sure that o1, o2, o3 are not all null before proceeding
    -           do {
    -                   attempt++;
    -                   LOG.info("Attempt " + attempt + " to read records and 
commit some offsets to Kafka");
    -
    -                   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -                   env.getConfig().disableSysoutLogging();
    -                   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -                   env.setParallelism(parallelism);
    -                   env.enableCheckpointing(20); // fast checkpoints to 
make sure we commit some offsets
    -
    -                   env
    -                           .addSource(kafkaServer.getConsumer(topicName, 
new SimpleStringSchema(), standardProps))
    -                           .map(new ThrottledMapper<String>(consumePause))
    -                           .map(new MapFunction<String, Object>() {
    -                                   int count = 0;
    -                                   @Override
    -                                   public Object map(String value) throws 
Exception {
    -                                           count++;
    -                                           if (count == recordsToConsume) {
    -                                                   throw new 
SuccessException();
    -                                           }
    -                                           return null;
    +           env
    +                   .addSource(kafkaServer.getConsumer(topicName, new 
SimpleStringSchema(), standardProps))
    +                   .map(new ThrottledMapper<String>(consumePause))
    +                   .map(new MapFunction<String, Object>() {
    +                           int count = 0;
    +                           @Override
    +                           public Object map(String value) throws 
Exception {
    +                                   count++;
    +                                   if (count == recordsToConsume) {
    +                                           throw new SuccessException();
                                        }
    -                           })
    -                           .addSink(new DiscardingSink<>());
    +                                   return null;
    +                           }
    +                   })
    +                   .addSink(new DiscardingSink<>());
     
    -                   tryExecute(env, "Read some records to commit offsets to 
Kafka");
    +           tryExecute(env, "Read some records to commit offsets to Kafka");
     
    +           // make sure that we indeed have some offsets committed to Kafka
    +           Long o1 = null;
    +           Long o2 = null;
    +           Long o3 = null;
    +           KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
    +           while (o1 == null && o2 == null && o3 == null) {
    +                   Thread.sleep(100);
                        o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 
0);
                        o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 
1);
                        o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 
2);
    -           } while (o1 == null && o2 == null && o3 == null && attempt < 3);
    --- End diff --
    
    I would still add a hard-limit for attempts (20?); there's no benefit in a 
test that succeeds _at some point_, if it doesn't finish relatively quickly the 
entire build will time out anyway.
    
    Also, when running tests locally i suppose there isn't even a time-limit 
that would kill the test...


> Instable Kafka testStartFromKafkaCommitOffsets failures on Travis
> -----------------------------------------------------------------
>
>                 Key: FLINK-7011
>                 URL: https://issues.apache.org/jira/browse/FLINK-7011
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Tests
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> Example:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/246703474/log.txt?X-Amz-Expires=30&X-Amz-Date=20170627T065647Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170627/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=dbfc90cfc386fef0990325b54ff74ee4d441944687e7fdaa73ce7b0c2b2ec0ea
> In general, the test {{testStartFromKafkaCommitOffsets}} implementation is a 
> bit of an overkill. Before continuing with the test, it writes some records 
> just for the sake of committing offsets to Kafka and waits for some offsets 
> to be committed (which leads to the instability), whereas we can do that 
> simply using the test base's {{OffsetHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to