lindong28 commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1280153092
########## flink-runtime/src/main/java/org/apache/flink/runtime/source/event/EndOfDataEvent.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.event; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** A source event notifying coordinator that a source operator has reached the end of data. */ +public class EndOfDataEvent implements OperatorEvent {} Review Comment: Do we still need this class? ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java: ########## @@ -2317,18 +2318,21 @@ private void testMaxConcurrentAttempts(int maxConcurrentAttempts) { new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 1L), TASK_MANAGER_LOCATION_INFO); - final Collection<ScheduledFuture<?>> periodicScheduledTasks = - manuallyTriggeredScheduledExecutor.getActivePeriodicScheduledTask(); - assertThat(periodicScheduledTasks.size()).isOne(); + final Collection<ScheduledFuture<?>> nonPeriodicScheduledTasks = + manuallyTriggeredScheduledExecutor.getActiveNonPeriodicScheduledRunnable( + CheckpointCoordinator.ScheduledTrigger.class); Review Comment: Passing a class as the parameter to test util method appears very hacky. Is there a way to avoid doing this? For example, can we just verify that the expected instance of runnable is found in the list of the scheduled tasks? ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java: ########## @@ -284,7 +284,7 @@ private CheckpointRequestDecider decider( maxQueued); } - private static final LongConsumer NO_OP = unused -> {}; + private static final BiConsumer<Long, Long> NO_OP = (unused, unused2) -> {}; Review Comment: It seems hack to name variables as unused2. How about using this: `(currentTimeMillis, tillNextMillis) -> {}` ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ########## @@ -572,6 +577,7 @@ private static final class LazyInitializedCoordinatorContext private GlobalFailureHandler globalFailureHandler; private Executor schedulerExecutor; private OperatorCoordinatorMetricGroup metricGroup; + private CheckpointCoordinator checkpointCoordinator; Review Comment: Given that it can be null, can we mark it nullable? Same for related method parameter. ########## flink-core/src/test/java/org/apache/flink/util/concurrent/ManuallyTriggeredScheduledExecutor.java: ########## @@ -94,6 +94,11 @@ public Collection<ScheduledFuture<?>> getActiveNonPeriodicScheduledTask() { return execService.getActiveNonPeriodicScheduledTask(); } + public Collection<ScheduledFuture<?>> getActiveNonPeriodicScheduledRunnable( Review Comment: Should we replace `Runnable` with `Task` for consistency with the existing method names? ########## flink-tests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceBlockableByCheckpoint.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.core.io.InputStatus; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +/** + * A {@link NumberSequenceSource} that can be blocked by checkpoints. This source can be used to Review Comment: It is hard to understand when/how to use this class by just reading this doc. For example, how do the values of `isBlockByCheckpoint` and `isBacklog` affect the behavior of this Source. Can you update the doc to clarify its semantics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
