lindong28 commented on code in PR #21589:
URL: https://github.com/apache/flink/pull/21589#discussion_r1103753156


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -479,6 +486,134 @@ public void testSupportsPausingOrResumingSplits() throws 
Exception {
         }
     }
 
+    @Test
+    public void testSupportsUsingRecordEvaluatorWithSplitsFinishedAtMiddle() 
throws Exception {
+        final Set<String> finishedSplits = new HashSet<>();
+        try (final KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.BOUNDED,
+                                "groupId",
+                                new TestingReaderContext(),
+                                finishedSplits::addAll,
+                                r -> r == 7 || r == NUM_RECORDS_PER_SPLIT + 
5)) {
+            KafkaPartitionSplit split1 =
+                    new KafkaPartitionSplit(new TopicPartition(TOPIC, 0), 0, 
Integer.MAX_VALUE);
+            KafkaPartitionSplit split2 =
+                    new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 0, 
Integer.MAX_VALUE);
+            reader.addSplits(Arrays.asList(split1, split2));
+            reader.notifyNoMoreSplits();
+
+            TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+            pollUntil(
+                    reader,
+                    output,
+                    () -> finishedSplits.size() == 2,
+                    "The reader cannot get the excepted result before 
timeout.");
+            InputStatus status;
+            while (true) {
+                status = reader.pollNext(output);
+                if (status == InputStatus.END_OF_INPUT) {
+                    break;
+                }
+                if (status == InputStatus.NOTHING_AVAILABLE) {
+                    reader.isAvailable().get();
+                }
+            }
+
+            assertThat(output.getEmittedRecords().size()).isEqualTo(12);
+            List<Integer> excepted =
+                    IntStream.concat(IntStream.range(0, 7), 
IntStream.range(10, 15))
+                            .boxed()
+                            .collect(Collectors.toList());
+            assertThat(finishedSplits)
+                    .containsExactly(
+                            new TopicPartition(TOPIC, 0).toString(),
+                            new TopicPartition(TOPIC, 1).toString());
+            assertThat(output.getEmittedRecords())
+                    .containsExactlyInAnyOrder(excepted.toArray(new 
Integer[12]));
+            assertThat(status).isEqualTo(END_OF_INPUT);
+        }
+    }
+
+    @Test
+    public void testSupportsUsingRecordEvaluatorWithSplitsFinishedAtEnd() 
throws Exception {
+        final Set<String> finishedSplits = new HashSet<>();
+        try (final KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.BOUNDED,
+                                "groupId",
+                                new TestingReaderContext(),
+                                finishedSplits::addAll,
+                                r -> r == 9 || r == 19)) {
+            KafkaPartitionSplit split1 =
+                    new KafkaPartitionSplit(new TopicPartition(TOPIC, 0), 0, 
Integer.MAX_VALUE);
+            KafkaPartitionSplit split2 =
+                    new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 0, 
Integer.MAX_VALUE);
+            reader.addSplits(Arrays.asList(split1, split2));
+            reader.notifyNoMoreSplits();
+
+            TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+            pollUntil(
+                    reader,
+                    output,
+                    () -> finishedSplits.size() == 2,
+                    "The reader cannot get the excepted result before 
timeout.");
+            InputStatus status = reader.pollNext(output);
+
+            assertThat(output.getEmittedRecords().size()).isEqualTo(18);
+            List<Integer> excepted =
+                    IntStream.concat(IntStream.range(0, 9), 
IntStream.range(10, 19))
+                            .boxed()
+                            .collect(Collectors.toList());
+            assertThat(finishedSplits)
+                    .containsExactly(
+                            new TopicPartition(TOPIC, 0).toString(),
+                            new TopicPartition(TOPIC, 1).toString());
+            assertThat(output.getEmittedRecords())
+                    .containsExactlyInAnyOrder(excepted.toArray(new 
Integer[0]));
+            assertThat(status).isEqualTo(END_OF_INPUT);
+        }
+    }
+
+    @Test
+    public void testSupportsUsingRecordEvaluatorWithUnfinishedSplit() throws 
Exception {
+        final Set<String> finishedSplits = new HashSet<>();
+
+        try (final KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.BOUNDED,
+                                "groupId",
+                                new TestingReaderContext(),
+                                finishedSplits::addAll,
+                                r -> r == 7)) {

Review Comment:
   It seems that the test is still hard to follow/understand. It is not 
self-explanatory why users would have the results that meet the `assertThat` 
conditions specified below after specifying `r -> r == 7` as the eof condition 
here.
   
   In general we can use variables instead of constant integers to show the the 
relationship between the user-specified and the expected result, similar how 
`NUM_RECORDS_PER_SPLIT` is used here [1].
   
   Maybe we can discuss offline regarding the purpose of these tests before 
updating this PR.
   
   Same for other tests added in this PR.
   
   [1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java#L356



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to