This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 70fecdd [FLINK-20469][minicluster] Enable TaskManager start and
terminate in MiniCluster
70fecdd is described below
commit 70fecdd515b7d147bb40dad04cca3896bbc3479a
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Wed Dec 2 11:37:07 2020 +0300
[FLINK-20469][minicluster] Enable TaskManager start and terminate in
MiniCluster
Currently we expose startTaskManager/terminateTaskManager only in internal
TestingMiniCluster.
Nonetheless, they are useful methods to implement IT cases similar to E2E
tests.
This closes #14300.
---
.../file/src/FileSourceTextLinesITCase.java | 4 +-
.../flink/runtime/minicluster/MiniCluster.java | 35 ++++++++++++----
.../runtime/minicluster/TestingMiniCluster.java | 12 ------
.../runtime/taskexecutor/TaskExecutorITCase.java | 21 +++++-----
.../recovery/BatchFineGrainedRecoveryITCase.java | 48 +++++++++++-----------
5 files changed, 62 insertions(+), 58 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index ce68f93..9075258 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -314,9 +314,9 @@ public class FileSourceTextLinesITCase extends TestLogger {
}
private static void restartTaskManager(Runnable afterFailAction) throws
Exception {
- miniCluster.terminateTaskExecutor(0).get();
+ miniCluster.terminateTaskManager(0).get();
afterFailAction.run();
- miniCluster.startTaskExecutor();
+ miniCluster.startTaskManager();
}
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 24cccb6..b3900a3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -454,7 +454,7 @@ public class MiniCluster implements AutoCloseableAsync {
final int numComponents = 2 +
miniClusterConfiguration.getNumTaskManagers();
final
Collection<CompletableFuture<Void>> componentTerminationFutures = new
ArrayList<>(numComponents);
-
componentTerminationFutures.addAll(terminateTaskExecutors());
+
componentTerminationFutures.addAll(terminateTaskManagers());
componentTerminationFutures.add(shutDownResourceManagerComponents());
@@ -519,12 +519,20 @@ public class MiniCluster implements AutoCloseableAsync {
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
for (int i = 0; i < numTaskManagers; i++) {
- startTaskExecutor();
+ startTaskManager();
}
}
- @VisibleForTesting
- void startTaskExecutor() throws Exception {
+ /**
+ * Starts additional TaskManager process.
+ *
+ * <p>When the MiniCluster starts up, it always starts {@link
MiniClusterConfiguration#getNumTaskManagers}
+ * TaskManagers. All TaskManagers are indexed from 0 to the number of
TaskManagers, started so far, minus one.
+ * This method starts a TaskManager with the next index which is the
number of TaskManagers, started so far.
+ * The index always increases with each new started TaskManager. The
indices of terminated TaskManagers
+ * are not reused after {@link #terminateTaskManager(int)}.
+ */
+ public void startTaskManager() throws Exception {
synchronized (lock) {
final Configuration configuration =
miniClusterConfiguration.getConfiguration();
@@ -551,18 +559,27 @@ public class MiniCluster implements AutoCloseableAsync {
}
@GuardedBy("lock")
- private Collection<? extends CompletableFuture<Void>>
terminateTaskExecutors() {
+ private Collection<? extends CompletableFuture<Void>>
terminateTaskManagers() {
final Collection<CompletableFuture<Void>> terminationFutures =
new ArrayList<>(taskManagers.size());
for (int i = 0; i < taskManagers.size(); i++) {
- terminationFutures.add(terminateTaskExecutor(i));
+ terminationFutures.add(terminateTaskManager(i));
}
return terminationFutures;
}
- @VisibleForTesting
- @Nonnull
- protected CompletableFuture<Void> terminateTaskExecutor(int index) {
+ /**
+ * Terminates a TaskManager with the given index.
+ *
+ * <p>See {@link #startTaskManager()} to understand how TaskManagers
are indexed.
+ * This method terminates a TaskManager with a given index but it does
not clear the index.
+ * The index stays occupied for the lifetime of the MiniCluster and its
TaskManager stays terminated.
+ * The index is not reused if more TaskManagers are started with {@link
#startTaskManager()}.
+ *
+ * @param index index of the TaskManager to terminate
+ * @return {@link CompletableFuture} of the given TaskManager
termination
+ */
+ public CompletableFuture<Void> terminateTaskManager(int index) {
synchronized (lock) {
final TaskExecutor taskExecutor =
taskManagers.get(index);
return taskExecutor.closeAsync();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index affd334..5f3c46c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -65,17 +64,6 @@ public class TestingMiniCluster extends MiniCluster {
this(miniClusterConfiguration, null);
}
- @Nonnull
- @Override
- public CompletableFuture<Void> terminateTaskExecutor(int index) {
- return super.terminateTaskExecutor(index);
- }
-
- @Override
- public void startTaskExecutor() throws Exception {
- super.startTaskExecutor();
- }
-
@Override
protected boolean useLocalCommunication() {
return localCommunication;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index f7c882b..6b5a841 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -34,8 +34,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
@@ -64,16 +64,15 @@ public class TaskExecutorITCase extends TestLogger {
private static final int SLOTS_PER_TM = 2;
private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
- private TestingMiniCluster miniCluster;
+ private MiniCluster miniCluster;
@Before
public void setup() throws Exception {
- miniCluster = new TestingMiniCluster(
- new TestingMiniClusterConfiguration.Builder()
+ miniCluster = new MiniCluster(
+ new MiniClusterConfiguration.Builder()
.setNumTaskManagers(NUM_TMS)
.setNumSlotsPerTaskManager(SLOTS_PER_TM)
- .build(),
- null);
+ .build());
miniCluster.start();
}
@@ -96,13 +95,13 @@ public class TaskExecutorITCase extends TestLogger {
final CompletableFuture<JobResult> jobResultFuture =
submitJobAndWaitUntilRunning(jobGraph);
// kill one TaskExecutor which should fail the job execution
- miniCluster.terminateTaskExecutor(0);
+ miniCluster.terminateTaskManager(0);
final JobResult jobResult = jobResultFuture.get();
assertThat(jobResult.isSuccess(), is(false));
- miniCluster.startTaskExecutor();
+ miniCluster.startTaskManager();
final JobGraph newJobGraph = createJobGraph(PARALLELISM);
BlockingOperator.unblock();
@@ -121,9 +120,9 @@ public class TaskExecutorITCase extends TestLogger {
final CompletableFuture<JobResult> jobResultFuture =
submitJobAndWaitUntilRunning(jobGraph);
// start an additional TaskExecutor
- miniCluster.startTaskExecutor();
+ miniCluster.startTaskManager();
- miniCluster.terminateTaskExecutor(0).get(); // this should fail
the job
+ miniCluster.terminateTaskManager(0).get(); // this should fail
the job
BlockingOperator.unblock();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index 028a9d0..024532e 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -34,9 +34,7 @@ import
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import
org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration.Builder;
+import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -52,6 +50,8 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
@@ -64,6 +64,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,7 +166,16 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
private static final List<Long> EXPECTED_JOB_OUTPUT =
LongStream.range(MAP_NUMBER, EMITTED_RECORD_NUMBER +
MAP_NUMBER).boxed().collect(Collectors.toList());
- private static TestingMiniCluster miniCluster;
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new
MiniClusterResource(
+ new MiniClusterResourceConfiguration
+ .Builder()
+ .setConfiguration(createConfiguration())
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ private static MiniCluster miniCluster;
private static MiniClusterClient client;
@@ -178,19 +188,7 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
@SuppressWarnings("OverlyBroadThrowsClause")
@Before
public void setup() throws Exception {
- Configuration configuration = new Configuration();
-
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
PIPELINED_REGION_RESTART_STRATEGY_NAME);
-
- miniCluster = new TestingMiniCluster(
- new Builder()
- .setNumTaskManagers(1)
- .setNumSlotsPerTaskManager(1)
- .setConfiguration(configuration)
-
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .build(),
- null);
- miniCluster.start();
-
+ miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
client = new MiniClusterClient(miniCluster);
lastTaskManagerIndexInMiniCluster = new AtomicInteger(0);
@@ -200,10 +198,6 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
@After
public void teardown() throws Exception {
- if (miniCluster != null) {
- miniCluster.close();
- }
-
if (client != null) {
client.close();
}
@@ -224,6 +218,12 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
failureTracker.verify(getMapperAttempts());
}
+ private static Configuration createConfiguration() {
+ Configuration configuration = new Configuration();
+
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
PIPELINED_REGION_RESTART_STRATEGY_NAME);
+ return configuration;
+ }
+
private static FailureStrategy createFailureStrategy(int trackingIndex)
{
int failWithExceptionAfterNumberOfProcessedRecords =
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
int failTaskExecutorAfterNumberOfProcessedRecords =
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
@@ -250,9 +250,9 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
private static void restartTaskManager() throws Exception {
int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
try {
- miniCluster.terminateTaskExecutor(tmi).get();
+ miniCluster.terminateTaskManager(tmi).get();
} finally {
- miniCluster.startTaskExecutor();
+ miniCluster.startTaskManager();
}
}
@@ -525,7 +525,7 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
private final ExecutorService executorService;
private final URI restAddress;
- private MiniClusterClient(TestingMiniCluster miniCluster)
throws ConfigurationException {
+ private MiniClusterClient(MiniCluster miniCluster) throws
ConfigurationException {
restAddress = miniCluster.getRestAddress().join();
executorService =
Executors.newSingleThreadScheduledExecutor(new
ExecutorThreadFactory("Flink-RestClient-IO"));
restClient = createRestClient();