This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38255652406becbfbcb7cbec557aa5ba9a1ebbb3
Author: JunRuiLee <[email protected]>
AuthorDate: Fri Mar 1 11:03:42 2024 +0800

    [FLINK-33984][runtime] Skip maintain subtaskGatewayMap when 
CheckpointCoordinator is null.
---
 .../coordination/OperatorCoordinatorHolder.java    | 11 +++--
 .../OperatorCoordinatorHolderTest.java             | 48 +++++++++++++++++++++-
 2 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index f3253f147db..8aff60fb766 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -110,9 +110,9 @@ public class OperatorCoordinatorHolder
 
     /**
      * A map that manages subtask gateways. It is used to control the 
opening/closing of each
-     * gateway during checkpoint. This map should only be read or modified 
when concurrent execution
-     * attempt is disabled. Note that concurrent execution attempt is 
currently guaranteed to be
-     * disabled when checkpoint is enabled.
+     * gateway during checkpoints. This map should only be read or modified in 
Streaming mode. Given
+     * that the CheckpointCoordinator is guaranteed to be non-null in 
Streaming mode, construction
+     * of this map can be skipped if the CheckpointCoordinator is null.
      */
     private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap;
 
@@ -437,9 +437,8 @@ public class OperatorCoordinatorHolder
         final SubtaskGatewayImpl gateway =
                 new SubtaskGatewayImpl(sta, mainThreadExecutor, 
unconfirmedEvents);
 
-        // When concurrent execution attempts is supported, the checkpoint 
must have been disabled.
-        // Thus, we don't need to maintain subtaskGatewayMap
-        if (!context.isConcurrentExecutionAttemptsSupported()) {
+        // We don't need to maintain subtaskGatewayMap when checkpoint 
coordinator is null.
+        if (context.getCheckpointCoordinator() != null) {
             subtaskGatewayMap.put(gateway.getSubtask(), gateway);
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index 95841dcb634..278214f8901 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -18,21 +18,36 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask;
 import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
+import org.apache.commons.collections.iterators.IteratorChain;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
@@ -42,6 +57,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
@@ -528,7 +544,37 @@ class OperatorCoordinatorHolderTest {
                                 new Configuration()),
                         
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
 
-        holder.lazyInitialize(globalFailureHandler, mainThreadExecutor, null);
+        JobID jobId = new JobID();
+        holder.lazyInitialize(
+                globalFailureHandler,
+                mainThreadExecutor,
+                new CheckpointCoordinator(
+                        jobId,
+                        CheckpointCoordinatorConfiguration.builder().build(),
+                        Collections.emptyList(),
+                        new StandaloneCheckpointIDCounter(),
+                        new StandaloneCompletedCheckpointStore(10),
+                        new MemoryStateBackend(),
+                        mainThreadExecutor,
+                        new CheckpointsCleaner(),
+                        mainThreadExecutor,
+                        new CheckpointFailureManager(0, 
NoOpFailJobCall.INSTANCE),
+                        new DefaultCheckpointPlanCalculator(
+                                jobId,
+                                new CheckpointPlanCalculatorContext() {
+                                    @Override
+                                    public ScheduledExecutor getMainExecutor() 
{
+                                        return null;
+                                    }
+
+                                    @Override
+                                    public boolean hasFinishedTasks() {
+                                        return false;
+                                    }
+                                },
+                                IteratorChain::new,
+                                false),
+                        new CheckpointStatsTracker(1, new 
UnregisteredMetricsGroup(), jobId)));
         holder.start();
 
         return holder;

Reply via email to