Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4190#discussion_r124527723
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws
Exception {
final String topicName =
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition,
parallelism, 1);
- KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
+ // read some records so that some offsets are committed to Kafka
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(20); // fast checkpoints to make sure
we commit some offsets
- Long o1;
- Long o2;
- Long o3;
- int attempt = 0;
- // make sure that o1, o2, o3 are not all null before proceeding
- do {
- attempt++;
- LOG.info("Attempt " + attempt + " to read records and
commit some offsets to Kafka");
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
-
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(parallelism);
- env.enableCheckpointing(20); // fast checkpoints to
make sure we commit some offsets
-
- env
- .addSource(kafkaServer.getConsumer(topicName,
new SimpleStringSchema(), standardProps))
- .map(new ThrottledMapper<String>(consumePause))
- .map(new MapFunction<String, Object>() {
- int count = 0;
- @Override
- public Object map(String value) throws
Exception {
- count++;
- if (count == recordsToConsume) {
- throw new
SuccessException();
- }
- return null;
+ env
+ .addSource(kafkaServer.getConsumer(topicName, new
SimpleStringSchema(), standardProps))
+ .map(new ThrottledMapper<String>(consumePause))
+ .map(new MapFunction<String, Object>() {
+ int count = 0;
+ @Override
+ public Object map(String value) throws
Exception {
+ count++;
+ if (count == recordsToConsume) {
+ throw new SuccessException();
}
- })
- .addSink(new DiscardingSink<>());
+ return null;
+ }
+ })
+ .addSink(new DiscardingSink<>());
- tryExecute(env, "Read some records to commit offsets to
Kafka");
+ tryExecute(env, "Read some records to commit offsets to Kafka");
+ // make sure that we indeed have some offsets committed to Kafka
+ Long o1 = null;
+ Long o2 = null;
+ Long o3 = null;
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
+ while (o1 == null && o2 == null && o3 == null) {
+ Thread.sleep(100);
o1 = kafkaOffsetHandler.getCommittedOffset(topicName,
0);
o2 = kafkaOffsetHandler.getCommittedOffset(topicName,
1);
o3 = kafkaOffsetHandler.getCommittedOffset(topicName,
2);
- } while (o1 == null && o2 == null && o3 == null && attempt < 3);
--- End diff --
I would still add a hard-limit for attempts (20?); there's no benefit in a
test that succeeds _at some point_, if it doesn't finish relatively quickly the
entire build will time out anyway.
Also, when running tests locally i suppose there isn't even a time-limit
that would kill the test...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---