1996fanrui commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1626888044


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/** This ITCase class tests the behavior of task execution with watermark 
alignment. */
+public class WatermarkAlignmentITCase {

Review Comment:
   ```suggestion
   class WatermarkAlignmentITCase {
   ```



##########
flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/** This ITCase class tests the behavior of task execution with watermark 
alignment. */
+public class WatermarkAlignmentITCase {
+
+    /**
+     * Test method to verify whether the watermark alignment works well with 
finished task.
+     *
+     * @throws Exception if any error occurs during the execution.
+     */
+    @Test
+    public void testTaskFinishedWithWatermarkAlignmentExecution() throws 
Exception {

Review Comment:
   ```suggestion
       void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception {
   ```
   
   Junit5 doesn't need public class and method.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -310,6 +310,53 @@ void testWatermarkAggregatorRandomly() {
         testWatermarkAggregatorRandomly(10, 10000, true, false);
     }
 
+    @Test
+    void testWatermarkAlignmentWhileSubtaskFinished() throws Exception {
+        long maxDrift = 1000L;
+        WatermarkAlignmentParams params =
+                new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
+
+        final Source<Integer, MockSourceSplit, Set<MockSourceSplit>> 
mockSource =
+                createMockSource();
+
+        sourceCoordinator =
+                new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(
+                        OPERATOR_NAME,
+                        mockSource,
+                        getNewSourceCoordinatorContext(),
+                        new CoordinatorStoreImpl(),
+                        params,
+                        null) {
+                    @Override
+                    void announceCombinedWatermark() {
+                        super.announceCombinedWatermark();
+                    }

Review Comment:
   
   
   It calls the `super.announceCombinedWatermark();` directly, I don't know why 
we need to  Override it.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -310,6 +310,53 @@ void testWatermarkAggregatorRandomly() {
         testWatermarkAggregatorRandomly(10, 10000, true, false);
     }
 
+    @Test
+    void testWatermarkAlignmentWhileSubtaskFinished() throws Exception {
+        long maxDrift = 1000L;
+        WatermarkAlignmentParams params =
+                new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
+
+        final Source<Integer, MockSourceSplit, Set<MockSourceSplit>> 
mockSource =
+                createMockSource();
+
+        sourceCoordinator =
+                new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(
+                        OPERATOR_NAME,
+                        mockSource,
+                        getNewSourceCoordinatorContext(),
+                        new CoordinatorStoreImpl(),
+                        params,
+                        null) {
+                    @Override
+                    void announceCombinedWatermark() {
+                        super.announceCombinedWatermark();
+                    }
+                };
+
+        sourceCoordinator.start();
+
+        int subtask0 = 0;
+        int subtask1 = 1;
+
+        setReaderTaskReady(sourceCoordinator, subtask0, 0);
+        setReaderTaskReady(sourceCoordinator, subtask1, 0);
+        registerReader(subtask0);
+        registerReader(subtask1);
+
+        reportWatermarkEvent(sourceCoordinator, subtask0, 42);
+        assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+
+        reportWatermarkEvent(sourceCoordinator, subtask1, 44);
+        assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+        // mock noMoreSplits event
+        assertHasNoMoreSplits(subtask0, true);

Review Comment:
   I'm not sure what `testWatermarkAlignmentWhileSubtaskFinished` wants to test?
   
   IIUC, the test still works even if we don't call 
`assertHasNoMoreSplits(subtask0, true);` (remove this line) here, right?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -377,4 +424,9 @@ private void assertLatestWatermarkAlignmentEvent(int 
subtask, long expectedWater
         assertThat(events.get(events.size() - 1))
                 .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark));
     }
+
+    private void assertHasNoMoreSplits(int subtask, boolean expected) {

Review Comment:
   method name is `assertHasNoMoreSplits`, but it triggers `signalNoMoreSplits` 
as well.
   
   I think `sourceCoordinator.getContext().signalNoMoreSplits(subtask);` should 
be moved out of this method.
   
   Also, `assertHasNoMoreSplits` is only called once, and the method only has 
one line. I'm not sure whether we need extract it as a separate method. Caller 
can call 
`assertThat(sourceCoordinator.getContext().hasNoMoreSplits(subtask)).isEqualTo(expected);`
 directly.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -377,4 +424,9 @@ private void assertLatestWatermarkAlignmentEvent(int 
subtask, long expectedWater
         assertThat(events.get(events.size() - 1))
                 .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark));
     }
+
+    private void assertHasNoMoreSplits(int subtask, boolean expected) {
+        sourceCoordinator.getContext().signalNoMoreSplits(0);

Review Comment:
   ```suggestion
           sourceCoordinator.getContext().signalNoMoreSplits(subtask);
   ```
   
   We should use subtask instead of 0 here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
         // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
         // the period task if it throws an exception).

Review Comment:
   nit: 
   
   It's better to move the old comments into 
`sendEventToSourceOperatorIfTaskReady` method, right?
   
   You can check these 2 comments for current PR, it's a little wired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to