Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4190#discussion_r124961658
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws
Exception {
final String topicName =
writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition,
parallelism, 1);
- KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
+ // read some records so that some offsets are committed to Kafka
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(20); // fast checkpoints to make sure
we commit some offsets
- Long o1;
- Long o2;
- Long o3;
- int attempt = 0;
- // make sure that o1, o2, o3 are not all null before proceeding
- do {
- attempt++;
- LOG.info("Attempt " + attempt + " to read records and
commit some offsets to Kafka");
-
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
-
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(parallelism);
- env.enableCheckpointing(20); // fast checkpoints to
make sure we commit some offsets
-
- env
- .addSource(kafkaServer.getConsumer(topicName,
new SimpleStringSchema(), standardProps))
- .map(new ThrottledMapper<String>(consumePause))
- .map(new MapFunction<String, Object>() {
- int count = 0;
- @Override
- public Object map(String value) throws
Exception {
- count++;
- if (count == recordsToConsume) {
- throw new
SuccessException();
- }
- return null;
+ env
+ .addSource(kafkaServer.getConsumer(topicName, new
SimpleStringSchema(), standardProps))
+ .map(new ThrottledMapper<String>(consumePause))
+ .map(new MapFunction<String, Object>() {
+ int count = 0;
+ @Override
+ public Object map(String value) throws
Exception {
+ count++;
+ if (count == recordsToConsume) {
+ throw new SuccessException();
}
- })
- .addSink(new DiscardingSink<>());
+ return null;
+ }
+ })
+ .addSink(new DiscardingSink<>());
- tryExecute(env, "Read some records to commit offsets to
Kafka");
+ tryExecute(env, "Read some records to commit offsets to Kafka");
+ // make sure that we indeed have some offsets committed to Kafka
+ Long o1 = null;
+ Long o2 = null;
+ Long o3 = null;
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler =
kafkaServer.createOffsetHandler();
+ while (o1 == null && o2 == null && o3 == null) {
+ Thread.sleep(100);
o1 = kafkaOffsetHandler.getCommittedOffset(topicName,
0);
o2 = kafkaOffsetHandler.getCommittedOffset(topicName,
1);
o3 = kafkaOffsetHandler.getCommittedOffset(topicName,
2);
- } while (o1 == null && o2 == null && o3 == null && attempt < 3);
--- End diff --
I understand the argument. Perhaps it is also a fact that this test is
covering too much into one single test, hence the awkwardness in making it
stable.
I think it is sufficient to have 2 separate tests that replace this:
(a) test that committed Kafka offsets are correct (there is already a
ITCase for this)
(b) test that committed offsets are correctly picked up and used correctly
(there is actually also a test for this already).
Hence, I would conclude that perhaps this test can be removed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---