Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r170966200 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws Exception { partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 - readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets); + readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, null, readProps, topicName, partitionsToValueCountAndStartOffsets); kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + /** + * This test ensures that the consumer correctly uses user-supplied timestamp when explicitly configured to + * start from timestamp. + * + * <p>The validated Kafka data is written in 2 steps: first, an initial 50 records is written to each partition. + * After that, another 30 records is appended to each partition. Before each step, a timestamp is recorded. + * For the validation, when the read job is configured to start from the first timestamp, each partition should start + * from offset 0 and read a total of 80 records. When configured to start from the second timestamp, + * each partition should start from offset 50 and read on the remaining 30 appended records. + */ + public void runStartFromTimestamp() throws Exception { + // 4 partitions with 50 records each + final int parallelism = 4; + final int initialRecordsInEachPartition = 50; + final int appendRecordsInEachPartition = 30; + + long firstTimestamp = 0; + long secondTimestamp = 0; + String topic = ""; + + // attempt to create an appended test sequence, where the timestamp of writing the appended sequence + // is assured to be larger than the timestamp of the original sequence. + final int maxRetries = 3; + int attempt = 0; + while (attempt != maxRetries) { + firstTimestamp = System.currentTimeMillis(); + topic = writeSequence("runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1); --- End diff -- Ah, I just thought that we could have a simple loop there: ``` long secondTimestamp = System.currentTimeMillis(); while (secondTimestamp <= firstTimestamp) { Thread.sleep(); secondTimestamp = System.currentTimeMillis(); } ``` what do you think?
---