[
https://issues.apache.org/jira/browse/FLINK-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069446#comment-16069446
]
ASF GitHub Bot commented on FLINK-7011:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4190#discussion_r124961658
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws
Exception {
final String topicName =
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition,
parallelism, 1);
- KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
+ // read some records so that some offsets are committed to Kafka
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(20); // fast checkpoints to make sure
we commit some offsets
- Long o1;
- Long o2;
- Long o3;
- int attempt = 0;
- // make sure that o1, o2, o3 are not all null before proceeding
- do {
- attempt++;
- LOG.info("Attempt " + attempt + " to read records and
commit some offsets to Kafka");
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
-
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(parallelism);
- env.enableCheckpointing(20); // fast checkpoints to
make sure we commit some offsets
-
- env
- .addSource(kafkaServer.getConsumer(topicName,
new SimpleStringSchema(), standardProps))
- .map(new ThrottledMapper<String>(consumePause))
- .map(new MapFunction<String, Object>() {
- int count = 0;
- @Override
- public Object map(String value) throws
Exception {
- count++;
- if (count == recordsToConsume) {
- throw new
SuccessException();
- }
- return null;
+ env
+ .addSource(kafkaServer.getConsumer(topicName, new
SimpleStringSchema(), standardProps))
+ .map(new ThrottledMapper<String>(consumePause))
+ .map(new MapFunction<String, Object>() {
+ int count = 0;
+ @Override
+ public Object map(String value) throws
Exception {
+ count++;
+ if (count == recordsToConsume) {
+ throw new SuccessException();
}
- })
- .addSink(new DiscardingSink<>());
+ return null;
+ }
+ })
+ .addSink(new DiscardingSink<>());
- tryExecute(env, "Read some records to commit offsets to
Kafka");
+ tryExecute(env, "Read some records to commit offsets to Kafka");
+ // make sure that we indeed have some offsets committed to Kafka
+ Long o1 = null;
+ Long o2 = null;
+ Long o3 = null;
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
+ while (o1 == null && o2 == null && o3 == null) {
+ Thread.sleep(100);
o1 = kafkaOffsetHandler.getCommittedOffset(topicName,
0);
o2 = kafkaOffsetHandler.getCommittedOffset(topicName,
1);
o3 = kafkaOffsetHandler.getCommittedOffset(topicName,
2);
- } while (o1 == null && o2 == null && o3 == null && attempt < 3);
--- End diff --
I understand the argument. Perhaps it is also a fact that this test is
covering too much into one single test, hence the awkwardness in making it
stable.
I think it is sufficient to have 2 separate tests that replace this:
(a) test that committed Kafka offsets are correct (there is already a
ITCase for this)
(b) test that committed offsets are correctly picked up and used correctly
(there is actually also a test for this already).
Hence, I would conclude that perhaps this test can be removed.
> Instable Kafka testStartFromKafkaCommitOffsets failures on Travis
> -----------------------------------------------------------------
>
> Key: FLINK-7011
> URL: https://issues.apache.org/jira/browse/FLINK-7011
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Tests
> Affects Versions: 1.3.1, 1.4.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> Example:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/246703474/log.txt?X-Amz-Expires=30&X-Amz-Date=20170627T065647Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170627/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=dbfc90cfc386fef0990325b54ff74ee4d441944687e7fdaa73ce7b0c2b2ec0ea
> In general, the test {{testStartFromKafkaCommitOffsets}} implementation is a
> bit of an overkill. Before continuing with the test, it writes some records
> just for the sake of committing offsets to Kafka and waits for some offsets
> to be committed (which leads to the instability), whereas we can do that
> simply using the test base's {{OffsetHandler}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)