[
https://issues.apache.org/jira/browse/FLINK-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649932#comment-16649932
]
ASF GitHub Bot commented on FLINK-10530:
----------------------------------------
tillrohrmann closed pull request #6827: [FLINK-10530][tests] Harden
ProcessFailureCancelingITCase and AbstractTaskManagerProcessFailureRecovery
URL: https://github.com/apache/flink/pull/6827
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5d7f26bb886..83298aa78ec 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -18,18 +18,19 @@
package org.apache.flink.test.recovery;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.BlobServerResource;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
@@ -42,6 +43,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
@@ -76,6 +79,9 @@
@Rule
public final BlobServerResource blobServerResource = new
BlobServerResource();
+ @Rule
+ public final ZooKeeperResource zooKeeperResource = new
ZooKeeperResource();
+
@Test
public void testTaskManagerProcessFailure() throws Exception {
@@ -89,18 +95,19 @@ public void testTaskManagerProcessFailure() throws
Exception {
File coordinateTempDir = null;
- final int jobManagerPort = NetUtils.getAvailablePort();
- final int restPort = NetUtils.getAvailablePort();
-
- Configuration jmConfig = new Configuration();
- jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
- jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
- jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
- jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
500L);
- jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT,
10000L);
- jmConfig.setInteger(RestOptions.PORT, restPort);
-
- try (final StandaloneSessionClusterEntrypoint clusterEntrypoint
= new StandaloneSessionClusterEntrypoint(jmConfig)) {
+ Configuration config = new Configuration();
+ config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
500L);
+ config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT,
10000L);
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getAbsolutePath());
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+
+ try (final StandaloneSessionClusterEntrypoint clusterEntrypoint
= new StandaloneSessionClusterEntrypoint(config)) {
// check that we run this test only if the java command
// is available on this machine
String javaCommand = getJavaCommandPath();
@@ -119,21 +126,28 @@ public void testTaskManagerProcessFailure() throws
Exception {
clusterEntrypoint.startCluster();
+ final Map<String, String> keyValues = config.toMap();
+ final ArrayList<String> commands = new
ArrayList<>((keyValues.size() << 1) + 8);
+
// the TaskManager java command
- String[] command = new String[] {
- javaCommand,
- "-Dlog.level=DEBUG",
- "-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath(),
- "-Xms80m", "-Xmx80m",
- "-classpath", getCurrentClasspath(),
-
TaskExecutorProcessEntryPoint.class.getName(),
- String.valueOf(jobManagerPort)
- };
+ commands.add(javaCommand);
+ commands.add("-Dlog.level=DEBUG");
+ commands.add("-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath());
+ commands.add("-Xms80m");
+ commands.add("-Xmx80m");
+ commands.add("-classpath");
+ commands.add(getCurrentClasspath());
+
commands.add(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName());
+
+ for (Map.Entry<String, String> keyValue:
keyValues.entrySet()) {
+ commands.add("--" + keyValue.getKey());
+ commands.add(keyValue.getValue());
+ }
// start the first two TaskManager processes
- taskManagerProcess1 = new
ProcessBuilder(command).start();
+ taskManagerProcess1 = new
ProcessBuilder(commands).start();
new
CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(),
processOutput1);
- taskManagerProcess2 = new
ProcessBuilder(command).start();
+ taskManagerProcess2 = new
ProcessBuilder(commands).start();
new
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(),
processOutput2);
// the program will set a marker file in each of its
parallel tasks once they are ready, so that
@@ -148,7 +162,7 @@ public void testTaskManagerProcessFailure() throws
Exception {
@Override
public void run() {
try {
-
testTaskManagerFailure(restPort, coordinateDirClosure);
+ testTaskManagerFailure(config,
coordinateDirClosure);
}
catch (Throwable t) {
t.printStackTrace();
@@ -176,7 +190,7 @@ public void run() {
}
// start the third TaskManager
- taskManagerProcess3 = new
ProcessBuilder(command).start();
+ taskManagerProcess3 = new
ProcessBuilder(commands).start();
new
CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(),
processOutput3);
// kill one of the previous TaskManagers, triggering a
failure and recovery
@@ -232,11 +246,11 @@ public void run() {
* The test program should be implemented here in a form of a separate
thread.
* This provides a solution for checking that it has been terminated.
*
- * @param jobManagerPort The port for submitting the topology to the
local cluster
+ * @param configuration the config to use
* @param coordinateDir TaskManager failure will be triggered only
after processes
* have successfully created file under
this directory
*/
- public abstract void testTaskManagerFailure(int jobManagerPort, File
coordinateDir) throws Exception;
+ public abstract void testTaskManagerFailure(Configuration
configuration, File coordinateDir) throws Exception;
protected static void printProcessLog(String processName, String log) {
if (log == null || log.length() == 0) {
@@ -306,15 +320,8 @@ protected static boolean waitForMarkerFiles(File basedir,
String prefix, int num
public static void main(String[] args) {
try {
- int jobManagerPort = Integer.parseInt(args[0]);
-
- Configuration cfg = new Configuration();
- cfg.setString(JobManagerOptions.ADDRESS,
"localhost");
- cfg.setInteger(JobManagerOptions.PORT,
jobManagerPort);
-
cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
- cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+ final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
+ Configuration cfg =
parameterTool.getConfiguration();
TaskManagerRunner.runTaskManager(cfg,
ResourceID.generate());
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index afca8f12100..2e39bafe67f 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -30,10 +30,12 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
@@ -53,16 +55,20 @@
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.StringWriter;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -84,6 +90,12 @@
@Rule
public final BlobServerResource blobServerResource = new
BlobServerResource();
+ @Rule
+ public final ZooKeeperResource zooKeeperResource = new
ZooKeeperResource();
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Test
public void testCancelingOnProcessFailure() throws Exception {
final StringWriter processOutput = new StringWriter();
@@ -93,23 +105,30 @@ public void testCancelingOnProcessFailure() throws
Exception {
Process taskManagerProcess = null;
final TestingFatalErrorHandler fatalErrorHandler = new
TestingFatalErrorHandler();
- Configuration jmConfig = new Configuration();
- jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
- jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
- jmConfig.setInteger(RestOptions.PORT, 0);
-
- final RpcService rpcService =
AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+ Configuration config = new Configuration();
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getAbsolutePath());
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+
+ final RpcService rpcService =
AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
final int jobManagerPort = rpcService.getPort();
- jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+ config.setInteger(JobManagerOptions.PORT, jobManagerPort);
final SessionDispatcherResourceManagerComponentFactory
resourceManagerComponentFactory = new
SessionDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE);
DispatcherResourceManagerComponent<?>
dispatcherResourceManagerComponent = null;
- try (final HighAvailabilityServices haServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
- jmConfig,
- TestingUtils.defaultExecutor(),
-
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
+ final HighAvailabilityServices haServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
+ try {
// check that we run this test only if the java command
// is available on this machine
@@ -125,7 +144,7 @@ public void testCancelingOnProcessFailure() throws
Exception {
CommonTestUtils.printLog4jDebugConfig(tempLogFile);
dispatcherResourceManagerComponent =
resourceManagerComponentFactory.create(
- jmConfig,
+ config,
rpcService,
haServices,
blobServerResource.getBlobServer(),
@@ -134,26 +153,26 @@ public void testCancelingOnProcessFailure() throws
Exception {
new MemoryArchivedExecutionGraphStore(),
fatalErrorHandler);
- // update the rest ports
- final int restPort = dispatcherResourceManagerComponent
- .getWebMonitorEndpoint()
- .getServerAddress()
- .getPort();
- jmConfig.setInteger(RestOptions.PORT, restPort);
+ final Map<String, String> keyValues = config.toMap();
+ final ArrayList<String> commands = new
ArrayList<>((keyValues.size() << 1) + 8);
// the TaskManager java command
- String[] command = new String[] {
- javaCommand,
- "-Dlog.level=DEBUG",
- "-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath(),
- "-Xms80m", "-Xmx80m",
- "-classpath", getCurrentClasspath(),
-
AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
- String.valueOf(jobManagerPort)
- };
+ commands.add(javaCommand);
+ commands.add("-Dlog.level=DEBUG");
+ commands.add("-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath());
+ commands.add("-Xms80m");
+ commands.add("-Xmx80m");
+ commands.add("-classpath");
+ commands.add(getCurrentClasspath());
+
commands.add(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName());
+
+ for (Map.Entry<String, String> keyValue:
keyValues.entrySet()) {
+ commands.add("--" + keyValue.getKey());
+ commands.add(keyValue.getValue());
+ }
// start the first two TaskManager processes
- taskManagerProcess = new
ProcessBuilder(command).start();
+ taskManagerProcess = new
ProcessBuilder(commands).start();
new
CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(),
processOutput);
final Throwable[] errorRef = new Throwable[1];
@@ -163,7 +182,7 @@ public void testCancelingOnProcessFailure() throws
Exception {
@Override
public void run() {
try {
- ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new
Configuration());
+ ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, config);
env.setParallelism(2);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
@@ -196,16 +215,10 @@ public Long map(Long value) throws Exception {
// kill the TaskManager
programThread.start();
- final LeaderConnectionInfo leaderConnectionInfo =
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(),
Time.seconds(10L));
-
- final DispatcherGateway dispatcherGateway =
rpcService.connect(
- leaderConnectionInfo.getAddress(),
-
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
- DispatcherGateway.class).get();
-
+ final DispatcherGateway dispatcherGateway =
retrieveDispatcherGateway(rpcService, haServices);
waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
- clusterClient = new RestClusterClient<>(jmConfig,
"standalone");
+ clusterClient = new RestClusterClient<>(config,
"standalone");
final Collection<JobID> jobIds =
waitForRunningJobs(clusterClient, timeout);
@@ -252,12 +265,31 @@ public Long map(Long value) throws Exception {
dispatcherResourceManagerComponent.close();
}
+ haServices.closeAndCleanupAllData();
+
fatalErrorHandler.rethrowError();
RpcUtils.terminateRpcService(rpcService,
Time.seconds(10L));
}
}
+ /**
+ * Helper method to wait until the {@link Dispatcher} has set its
fencing token.
+ *
+ * @param rpcService to use to connect to the dispatcher
+ * @param haServices high availability services to connect to the
dispatcher
+ * @return {@link DispatcherGateway}
+ * @throws Exception if something goes wrong
+ */
+ static DispatcherGateway retrieveDispatcherGateway(RpcService
rpcService, HighAvailabilityServices haServices) throws Exception {
+ final LeaderConnectionInfo leaderConnectionInfo =
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(),
Time.seconds(10L));
+
+ return rpcService.connect(
+ leaderConnectionInfo.getAddress(),
+
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+ DispatcherGateway.class).get();
+ }
+
private void waitUntilAllSlotsAreUsed(DispatcherGateway
dispatcherGateway, Time timeout) throws ExecutionException,
InterruptedException {
FutureUtils.retrySuccesfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(timeout),
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 4815c4938f7..473fb3959f8 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -64,10 +64,9 @@ public
TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode)
//
--------------------------------------------------------------------------------------------
@Override
- public void testTaskManagerFailure(int jobManagerPort, final File
coordinateDir) throws Exception {
+ public void testTaskManagerFailure(Configuration configuration, final
File coordinateDir) throws Exception {
- final Configuration configuration = new Configuration();
- ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort,
configuration);
+ ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration);
env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1,
0L));
env.getConfig().setExecutionMode(executionMode);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index fbf6b5b71e3..8ccb311be1c 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -61,14 +61,13 @@
private static final int DATA_COUNT = 10000;
@Override
- public void testTaskManagerFailure(int jobManagerPort, final File
coordinateDir) throws Exception {
+ public void testTaskManagerFailure(Configuration configuration, final
File coordinateDir) throws Exception {
final File tempCheckpointDir = tempFolder.newFolder();
- final Configuration configuration = new Configuration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(
"localhost",
- jobManagerPort,
+ 1337, // not needed since we use ZooKeeper
configuration);
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> ProcessFailureCancelingITCase.testCancelingOnProcessFailure failed on Travis.
> -----------------------------------------------------------------------------
>
> Key: FLINK-10530
> URL: https://issues.apache.org/jira/browse/FLINK-10530
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 1.7.0
> Reporter: Kostas Kloudas
> Assignee: Till Rohrmann
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.7.0
>
>
> The logs from Travis: https://api.travis-ci.org/v3/job/440109944/log.txt
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)