This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1839fa5 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout
to configured Akka ask timeout
1839fa5 is described below
commit 1839fa57a91723f8ef10bcbd2c271366b5509b0b
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jun 8 16:05:51 2020 +0200
[FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured
Akka ask timeout
This commit hardens all CancelingTestBase tests by using the configured
Akka ask timeout of
200s as the rpc timeout.
This closes #12531.
---
.../apache/flink/test/cancelling/CancelingTestBase.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index b3905c0..03bc6eb 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger {
protected static final int PARALLELISM = 4;
- protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000
milliseconds
+ private static final Configuration configuration = getConfiguration();
//
--------------------------------------------------------------------------------------------
@ClassRule
public static final MiniClusterWithClientResource CLUSTER = new
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfiguration())
+ .setConfiguration(configuration)
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(4)
.build());
@@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
+ final long rpcTimeout =
AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
+
ClusterClient<?> client = CLUSTER.getClusterClient();
JobSubmissionResult jobSubmissionResult =
ClientUtils.submitJob(client, jobGraph);
Deadline submissionDeadLine = new FiniteDuration(2,
TimeUnit.MINUTES).fromNow();
- JobStatus jobStatus =
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT,
TimeUnit.MILLISECONDS);
+ JobStatus jobStatus =
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout,
TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING &&
submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
- jobStatus =
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT,
TimeUnit.MILLISECONDS);
+ jobStatus =
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout,
TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
@@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends
TestLogger {
Deadline cancelDeadline = new
FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
- JobStatus jobStatusAfterCancel =
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT,
TimeUnit.MILLISECONDS);
+ JobStatus jobStatusAfterCancel =
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout,
TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED &&
cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
- jobStatusAfterCancel =
client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT,
TimeUnit.MILLISECONDS);
+ jobStatusAfterCancel =
client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout,
TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
Assert.fail("Failed to cancel job with ID " +
jobSubmissionResult.getJobID() + '.');