This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new ae47ba9 [FLINK-19863][test] Fix SQLClientHBaseITCase.testHBase failed
with process timeout
ae47ba9 is described below
commit ae47ba9613ee6f6a5b476026f9d9c04df8ef5632
Author: Leonard Xu <[email protected]>
AuthorDate: Fri Dec 4 21:10:44 2020 +0800
[FLINK-19863][test] Fix SQLClientHBaseITCase.testHBase failed with process
timeout
This closes #14274
---
.../tests/util/kafka/SQLClientKafkaITCase.java | 14 ++++++------
.../tests/util/kafka/StreamingKafkaITCase.java | 25 ++++++++++++----------
.../flink/tests/util/flink/ClusterController.java | 7 ++++--
.../flink/tests/util/flink/FlinkDistribution.java | 9 ++++----
.../flink/tests/util/flink/FlinkResource.java | 3 ++-
.../util/flink/LocalStandaloneFlinkResource.java | 9 ++++----
.../tests/util/hbase/SQLClientHBaseITCase.java | 12 ++++++-----
7 files changed, 46 insertions(+), 33 deletions(-)
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index d33efdb..4509c28 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -170,12 +170,14 @@ public class SQLClientKafkaITCase extends TestLogger {
private void executeSqlStatements(ClusterController clusterController,
List<String> sqlLines) throws IOException {
LOG.info("Executing Kafka {} end-to-end SQL statements.",
kafkaSQLVersion);
- clusterController.submitSQLJob(new
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
- .addJar(sqlAvroJar)
- .addJars(apacheAvroJars)
- .addJar(sqlConnectorKafkaJar)
- .addJar(sqlToolBoxJar)
- .build());
+ clusterController.submitSQLJob(
+ new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+ .addJar(sqlAvroJar)
+ .addJars(apacheAvroJars)
+ .addJar(sqlConnectorKafkaJar)
+ .addJar(sqlToolBoxJar)
+ .build(),
+ Duration.ofMinutes(2L));
}
private List<String> initializeSqlLines(Map<String, String> vars)
throws IOException {
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index a52414c..6c149d7 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -97,17 +98,19 @@ public class StreamingKafkaITCase extends TestLogger {
kafka.createTopic(1, 1, outputTopic);
// run the Flink job (detached mode)
- clusterController.submitJob(new
JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
- .setDetached(true)
- .addArgument("--input-topic", inputTopic)
- .addArgument("--output-topic", outputTopic)
- .addArgument("--prefix", "PREFIX")
- .addArgument("--bootstrap.servers",
kafka.getBootstrapServerAddresses().stream().map(address ->
address.getHostString() + ':' +
address.getPort()).collect(Collectors.joining(",")))
- .addArgument("--group.id", "myconsumer")
- .addArgument("--auto.offset.reset", "earliest")
- .addArgument("--transaction.timeout.ms",
"900000")
-
.addArgument("--flink.partition-discovery.interval-millis", "1000")
- .build());
+ clusterController.submitJob(
+ new
JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
+ .setDetached(true)
+ .addArgument("--input-topic",
inputTopic)
+ .addArgument("--output-topic",
outputTopic)
+ .addArgument("--prefix", "PREFIX")
+ .addArgument("--bootstrap.servers",
kafka.getBootstrapServerAddresses().stream().map(address ->
address.getHostString() + ':' +
address.getPort()).collect(Collectors.joining(",")))
+ .addArgument("--group.id", "myconsumer")
+ .addArgument("--auto.offset.reset",
"earliest")
+
.addArgument("--transaction.timeout.ms", "900000")
+
.addArgument("--flink.partition-discovery.interval-millis", "1000")
+ .build(),
+ Duration.ofMinutes(2L));
LOG.info("Sending messages to Kafka topic [{}] ...",
inputTopic);
// send some data to Kafka
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
index 9f86306..fe7dbf4 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
@@ -21,6 +21,7 @@ package org.apache.flink.tests.util.flink;
import org.apache.flink.util.AutoCloseableAsync;
import java.io.IOException;
+import java.time.Duration;
/**
* Controller for interacting with a cluster.
@@ -31,16 +32,18 @@ public interface ClusterController extends
AutoCloseableAsync {
* Submits the given job to the cluster.
*
* @param job job to submit
+ * @param timeout the maximum time to wait.
* @return JobController for the submitted job
* @throws IOException
*/
- JobController submitJob(JobSubmission job) throws IOException;
+ JobController submitJob(JobSubmission job, Duration timeout) throws
IOException;
/**
* Submits the given SQL job to the cluster.
*
* @param job job to submit.
+ * @param timeout the maximum time to wait.
* @throws IOException if any IO error happen.
*/
- void submitSQLJob(SQLJobSubmission job) throws IOException;
+ void submitSQLJob(SQLJobSubmission job, Duration timeout) throws
IOException;
}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 07c910e..76f7213 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -45,6 +45,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -151,7 +152,7 @@ final class FlinkDistribution {
AutoClosableProcess.runBlocking(bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
}
- public JobID submitJob(final JobSubmission jobSubmission) throws
IOException {
+ public JobID submitJob(final JobSubmission jobSubmission, Duration
timeout) throws IOException {
final List<String> commands = new ArrayList<>(4);
commands.add(bin.resolve("flink").toString());
commands.add("run");
@@ -190,14 +191,14 @@ final class FlinkDistribution {
}
try {
- return
JobID.fromHexString(rawJobIdFuture.get(1, TimeUnit.MINUTES));
+ return
JobID.fromHexString(rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
} catch (Exception e) {
throw new IOException("Could not determine Job
ID.", e);
}
}
}
- public void submitSQLJob(SQLJobSubmission job) throws IOException {
+ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws
IOException {
final List<String> commands = new ArrayList<>();
commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
commands.add("embedded");
@@ -218,7 +219,7 @@ final class FlinkDistribution {
.create(commands.toArray(new String[0]))
.setStdInputs(job.getSqlLines().toArray(new String[0]))
.setStdoutProcessor(LOG::info) // logging the SQL
statements and error message
- .runBlocking();
+ .runBlocking(timeout);
}
public void performJarOperation(JarOperation operation) throws
IOException {
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
index 0e285f2..6e1506a 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
@@ -22,6 +22,7 @@ import org.apache.flink.tests.util.util.FactoryUtils;
import org.apache.flink.util.ExternalResource;
import java.io.IOException;
+import java.time.Duration;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,7 +39,7 @@ public interface FlinkResource extends ExternalResource {
* <p>The exact constellation of the cluster is undefined.
*
* <p>In the case of per-job clusters this method may not start any
Flink processes, deferring this to
- * {@link ClusterController#submitJob(JobSubmission)}.
+ * {@link ClusterController#submitJob(JobSubmission, Duration)}.
*
* @return controller for interacting with the cluster
* @throws IOException
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 55ef08c..82ba5ff 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -185,15 +186,15 @@ public class LocalStandaloneFlinkResource implements
FlinkResource {
}
@Override
- public JobController submitJob(JobSubmission job) throws
IOException {
- final JobID run = distribution.submitJob(job);
+ public JobController submitJob(JobSubmission job, Duration
timeout) throws IOException {
+ final JobID run = distribution.submitJob(job, timeout);
return new StandaloneJobController(run);
}
@Override
- public void submitSQLJob(SQLJobSubmission job) throws
IOException {
- distribution.submitSQLJob(job);
+ public void submitSQLJob(SQLJobSubmission job, Duration
timeout) throws IOException {
+ distribution.submitSQLJob(job, timeout);
}
@Override
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index ca603f4..1bd51b3 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -212,10 +212,12 @@ public class SQLClientHBaseITCase extends TestLogger {
private void executeSqlStatements(ClusterController clusterController,
List<String> sqlLines) throws IOException {
LOG.info("Executing SQL: HBase source table -> HBase sink
table");
- clusterController.submitSQLJob(new
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
- .addJar(sqlToolBoxJar)
- .addJar(sqlConnectorHBaseJar)
- .addJars(hadoopClasspathJars)
- .build());
+ clusterController.submitSQLJob(
+ new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+ .addJar(sqlToolBoxJar)
+ .addJar(sqlConnectorHBaseJar)
+ .addJars(hadoopClasspathJars)
+ .build(),
+ Duration.ofMinutes(2L));
}
}