This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 70fecdd  [FLINK-20469][minicluster] Enable TaskManager start and 
terminate in MiniCluster
70fecdd is described below

commit 70fecdd515b7d147bb40dad04cca3896bbc3479a
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Wed Dec 2 11:37:07 2020 +0300

    [FLINK-20469][minicluster] Enable TaskManager start and terminate in 
MiniCluster
    
    Currently we expose startTaskManager/terminateTaskManager only in internal 
TestingMiniCluster.
    Nonetheless, they are useful methods to implement IT cases similar to E2E 
tests.
    
    This closes #14300.
---
 .../file/src/FileSourceTextLinesITCase.java        |  4 +-
 .../flink/runtime/minicluster/MiniCluster.java     | 35 ++++++++++++----
 .../runtime/minicluster/TestingMiniCluster.java    | 12 ------
 .../runtime/taskexecutor/TaskExecutorITCase.java   | 21 +++++-----
 .../recovery/BatchFineGrainedRecoveryITCase.java   | 48 +++++++++++-----------
 5 files changed, 62 insertions(+), 58 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index ce68f93..9075258 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -314,9 +314,9 @@ public class FileSourceTextLinesITCase extends TestLogger {
        }
 
        private static void restartTaskManager(Runnable afterFailAction) throws 
Exception {
-               miniCluster.terminateTaskExecutor(0).get();
+               miniCluster.terminateTaskManager(0).get();
                afterFailAction.run();
-               miniCluster.startTaskExecutor();
+               miniCluster.startTaskManager();
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 24cccb6..b3900a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -454,7 +454,7 @@ public class MiniCluster implements AutoCloseableAsync {
                                        final int numComponents = 2 + 
miniClusterConfiguration.getNumTaskManagers();
                                        final 
Collection<CompletableFuture<Void>> componentTerminationFutures = new 
ArrayList<>(numComponents);
 
-                                       
componentTerminationFutures.addAll(terminateTaskExecutors());
+                                       
componentTerminationFutures.addAll(terminateTaskManagers());
 
                                        
componentTerminationFutures.add(shutDownResourceManagerComponents());
 
@@ -519,12 +519,20 @@ public class MiniCluster implements AutoCloseableAsync {
                LOG.info("Starting {} TaskManger(s)", numTaskManagers);
 
                for (int i = 0; i < numTaskManagers; i++) {
-                       startTaskExecutor();
+                       startTaskManager();
                }
        }
 
-       @VisibleForTesting
-       void startTaskExecutor() throws Exception {
+       /**
+        * Starts additional TaskManager process.
+        *
+        * <p>When the MiniCluster starts up, it always starts {@link 
MiniClusterConfiguration#getNumTaskManagers}
+        * TaskManagers. All TaskManagers are indexed from 0 to the number of 
TaskManagers, started so far, minus one.
+        * This method starts a TaskManager with the next index which is the 
number of TaskManagers, started so far.
+        * The index always increases with each new started TaskManager. The 
indices of terminated TaskManagers
+        * are not reused after {@link #terminateTaskManager(int)}.
+        */
+       public void startTaskManager() throws Exception {
                synchronized (lock) {
                        final Configuration configuration = 
miniClusterConfiguration.getConfiguration();
 
@@ -551,18 +559,27 @@ public class MiniCluster implements AutoCloseableAsync {
        }
 
        @GuardedBy("lock")
-       private Collection<? extends CompletableFuture<Void>> 
terminateTaskExecutors() {
+       private Collection<? extends CompletableFuture<Void>> 
terminateTaskManagers() {
                final Collection<CompletableFuture<Void>> terminationFutures = 
new ArrayList<>(taskManagers.size());
                for (int i = 0; i < taskManagers.size(); i++) {
-                       terminationFutures.add(terminateTaskExecutor(i));
+                       terminationFutures.add(terminateTaskManager(i));
                }
 
                return terminationFutures;
        }
 
-       @VisibleForTesting
-       @Nonnull
-       protected CompletableFuture<Void> terminateTaskExecutor(int index) {
+       /**
+        * Terminates a TaskManager with the given index.
+        *
+        * <p>See {@link #startTaskManager()} to understand how TaskManagers 
are indexed.
+        * This method terminates a TaskManager with a given index but it does 
not clear the index.
+        * The index stays occupied for the lifetime of the MiniCluster and its 
TaskManager stays terminated.
+        * The index is not reused if more TaskManagers are started with {@link 
#startTaskManager()}.
+        *
+        * @param index index of the TaskManager to terminate
+        * @return {@link CompletableFuture} of the given TaskManager 
termination
+        */
+       public CompletableFuture<Void> terminateTaskManager(int index) {
                synchronized (lock) {
                        final TaskExecutor taskExecutor = 
taskManagers.get(index);
                        return taskExecutor.closeAsync();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index affd334..5f3c46c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,17 +64,6 @@ public class TestingMiniCluster extends MiniCluster {
                this(miniClusterConfiguration, null);
        }
 
-       @Nonnull
-       @Override
-       public CompletableFuture<Void> terminateTaskExecutor(int index) {
-               return super.terminateTaskExecutor(index);
-       }
-
-       @Override
-       public void startTaskExecutor() throws Exception {
-               super.startTaskExecutor();
-       }
-
        @Override
        protected boolean useLocalCommunication() {
                return localCommunication;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index f7c882b..6b5a841 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.SupplierWithException;
@@ -64,16 +64,15 @@ public class TaskExecutorITCase extends TestLogger {
        private static final int SLOTS_PER_TM = 2;
        private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
 
-       private TestingMiniCluster miniCluster;
+       private MiniCluster miniCluster;
 
        @Before
        public void setup() throws Exception  {
-               miniCluster = new TestingMiniCluster(
-                       new TestingMiniClusterConfiguration.Builder()
+               miniCluster = new MiniCluster(
+                       new MiniClusterConfiguration.Builder()
                                .setNumTaskManagers(NUM_TMS)
                                .setNumSlotsPerTaskManager(SLOTS_PER_TM)
-                               .build(),
-                       null);
+                               .build());
 
                miniCluster.start();
        }
@@ -96,13 +95,13 @@ public class TaskExecutorITCase extends TestLogger {
                final CompletableFuture<JobResult> jobResultFuture = 
submitJobAndWaitUntilRunning(jobGraph);
 
                // kill one TaskExecutor which should fail the job execution
-               miniCluster.terminateTaskExecutor(0);
+               miniCluster.terminateTaskManager(0);
 
                final JobResult jobResult = jobResultFuture.get();
 
                assertThat(jobResult.isSuccess(), is(false));
 
-               miniCluster.startTaskExecutor();
+               miniCluster.startTaskManager();
 
                final JobGraph newJobGraph = createJobGraph(PARALLELISM);
                BlockingOperator.unblock();
@@ -121,9 +120,9 @@ public class TaskExecutorITCase extends TestLogger {
                final CompletableFuture<JobResult> jobResultFuture = 
submitJobAndWaitUntilRunning(jobGraph);
 
                // start an additional TaskExecutor
-               miniCluster.startTaskExecutor();
+               miniCluster.startTaskManager();
 
-               miniCluster.terminateTaskExecutor(0).get(); // this should fail 
the job
+               miniCluster.terminateTaskManager(0).get(); // this should fail 
the job
 
                BlockingOperator.unblock();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index 028a9d0..024532e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -34,9 +34,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import 
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import 
org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration.Builder;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -52,6 +50,8 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
@@ -64,6 +64,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -165,7 +166,16 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
        private static final List<Long> EXPECTED_JOB_OUTPUT =
                LongStream.range(MAP_NUMBER, EMITTED_RECORD_NUMBER + 
MAP_NUMBER).boxed().collect(Collectors.toList());
 
-       private static TestingMiniCluster miniCluster;
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResourceConfiguration
+                       .Builder()
+                       .setConfiguration(createConfiguration())
+                       .setNumberTaskManagers(1)
+                       .setNumberSlotsPerTaskManager(1)
+                       .build());
+
+       private static MiniCluster miniCluster;
 
        private static MiniClusterClient client;
 
@@ -178,19 +188,7 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
        @SuppressWarnings("OverlyBroadThrowsClause")
        @Before
        public void setup() throws Exception {
-               Configuration configuration = new Configuration();
-               
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
PIPELINED_REGION_RESTART_STRATEGY_NAME);
-
-               miniCluster = new TestingMiniCluster(
-                       new Builder()
-                               .setNumTaskManagers(1)
-                               .setNumSlotsPerTaskManager(1)
-                               .setConfiguration(configuration)
-                               
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                               .build(),
-                       null);
-               miniCluster.start();
-
+               miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
                client = new MiniClusterClient(miniCluster);
 
                lastTaskManagerIndexInMiniCluster = new AtomicInteger(0);
@@ -200,10 +198,6 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
 
        @After
        public void teardown() throws Exception {
-               if (miniCluster != null) {
-                       miniCluster.close();
-               }
-
                if (client != null) {
                        client.close();
                }
@@ -224,6 +218,12 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
                failureTracker.verify(getMapperAttempts());
        }
 
+       private static Configuration createConfiguration() {
+               Configuration configuration = new Configuration();
+               
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
PIPELINED_REGION_RESTART_STRATEGY_NAME);
+               return configuration;
+       }
+
        private static FailureStrategy createFailureStrategy(int trackingIndex) 
{
                int failWithExceptionAfterNumberOfProcessedRecords = 
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
                int failTaskExecutorAfterNumberOfProcessedRecords = 
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
@@ -250,9 +250,9 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
        private static void restartTaskManager() throws Exception {
                int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
                try {
-                       miniCluster.terminateTaskExecutor(tmi).get();
+                       miniCluster.terminateTaskManager(tmi).get();
                } finally {
-                       miniCluster.startTaskExecutor();
+                       miniCluster.startTaskManager();
                }
        }
 
@@ -525,7 +525,7 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
                private final ExecutorService executorService;
                private final URI restAddress;
 
-               private MiniClusterClient(TestingMiniCluster miniCluster) 
throws ConfigurationException {
+               private MiniClusterClient(MiniCluster miniCluster) throws 
ConfigurationException {
                        restAddress = miniCluster.getRestAddress().join();
                        executorService = 
Executors.newSingleThreadScheduledExecutor(new 
ExecutorThreadFactory("Flink-RestClient-IO"));
                        restClient = createRestClient();

Reply via email to