1996fanrui commented on code in PR #24757: URL: https://github.com/apache/flink/pull/24757#discussion_r1626888044
########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +/** This ITCase class tests the behavior of task execution with watermark alignment. */ +public class WatermarkAlignmentITCase { Review Comment: ```suggestion class WatermarkAlignmentITCase { ``` ########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +/** This ITCase class tests the behavior of task execution with watermark alignment. */ +public class WatermarkAlignmentITCase { + + /** + * Test method to verify whether the watermark alignment works well with finished task. + * + * @throws Exception if any error occurs during the execution. + */ + @Test + public void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { Review Comment: ```suggestion void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { ``` Junit5 doesn't need public class and method. ########## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ########## @@ -310,6 +310,53 @@ void testWatermarkAggregatorRandomly() { testWatermarkAggregatorRandomly(10, 10000, true, false); } + @Test + void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { + long maxDrift = 1000L; + WatermarkAlignmentParams params = + new WatermarkAlignmentParams(maxDrift, "group1", maxDrift); + + final Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = + createMockSource(); + + sourceCoordinator = + new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>( + OPERATOR_NAME, + mockSource, + getNewSourceCoordinatorContext(), + new CoordinatorStoreImpl(), + params, + null) { + @Override + void announceCombinedWatermark() { + super.announceCombinedWatermark(); + } Review Comment: It calls the `super.announceCombinedWatermark();` directly, I don't know why we need to Override it. ########## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ########## @@ -310,6 +310,53 @@ void testWatermarkAggregatorRandomly() { testWatermarkAggregatorRandomly(10, 10000, true, false); } + @Test + void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { + long maxDrift = 1000L; + WatermarkAlignmentParams params = + new WatermarkAlignmentParams(maxDrift, "group1", maxDrift); + + final Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = + createMockSource(); + + sourceCoordinator = + new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>( + OPERATOR_NAME, + mockSource, + getNewSourceCoordinatorContext(), + new CoordinatorStoreImpl(), + params, + null) { + @Override + void announceCombinedWatermark() { + super.announceCombinedWatermark(); + } + }; + + sourceCoordinator.start(); + + int subtask0 = 0; + int subtask1 = 1; + + setReaderTaskReady(sourceCoordinator, subtask0, 0); + setReaderTaskReady(sourceCoordinator, subtask1, 0); + registerReader(subtask0); + registerReader(subtask1); + + reportWatermarkEvent(sourceCoordinator, subtask0, 42); + assertLatestWatermarkAlignmentEvent(subtask0, 1042); + + reportWatermarkEvent(sourceCoordinator, subtask1, 44); + assertLatestWatermarkAlignmentEvent(subtask1, 1042); + + // mock noMoreSplits event + assertHasNoMoreSplits(subtask0, true); Review Comment: I'm not sure what `testWatermarkAlignmentWhileSubtaskFinished` wants to test? IIUC, the test still works even if we don't call `assertHasNoMoreSplits(subtask0, true);` (remove this line) here, right? ########## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ########## @@ -377,4 +424,9 @@ private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWater assertThat(events.get(events.size() - 1)) .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark)); } + + private void assertHasNoMoreSplits(int subtask, boolean expected) { Review Comment: method name is `assertHasNoMoreSplits`, but it triggers `signalNoMoreSplits` as well. I think `sourceCoordinator.getContext().signalNoMoreSplits(subtask);` should be moved out of this method. Also, `assertHasNoMoreSplits` is only called once, and the method only has one line. I'm not sure whether we need extract it as a separate method. Caller can call `assertThat(sourceCoordinator.getContext().hasNoMoreSplits(subtask)).isEqualTo(expected);` directly. ########## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java: ########## @@ -377,4 +424,9 @@ private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWater assertThat(events.get(events.size() - 1)) .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark)); } + + private void assertHasNoMoreSplits(int subtask, boolean expected) { + sourceCoordinator.getContext().signalNoMoreSplits(0); Review Comment: ```suggestion sourceCoordinator.getContext().signalNoMoreSplits(subtask); ``` We should use subtask instead of 0 here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ########## @@ -201,8 +201,11 @@ void announceCombinedWatermark() { // to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule // the period task if it throws an exception). Review Comment: nit: It's better to move the old comments into `sendEventToSourceOperatorIfTaskReady` method, right? You can check these 2 comments for current PR, it's a little wired. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
