This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new ae47ba9  [FLINK-19863][test] Fix SQLClientHBaseITCase.testHBase failed 
with process timeout
ae47ba9 is described below

commit ae47ba9613ee6f6a5b476026f9d9c04df8ef5632
Author: Leonard Xu <[email protected]>
AuthorDate: Fri Dec 4 21:10:44 2020 +0800

    [FLINK-19863][test] Fix SQLClientHBaseITCase.testHBase failed with process 
timeout
    
    This closes #14274
---
 .../tests/util/kafka/SQLClientKafkaITCase.java     | 14 ++++++------
 .../tests/util/kafka/StreamingKafkaITCase.java     | 25 ++++++++++++----------
 .../flink/tests/util/flink/ClusterController.java  |  7 ++++--
 .../flink/tests/util/flink/FlinkDistribution.java  |  9 ++++----
 .../flink/tests/util/flink/FlinkResource.java      |  3 ++-
 .../util/flink/LocalStandaloneFlinkResource.java   |  9 ++++----
 .../tests/util/hbase/SQLClientHBaseITCase.java     | 12 ++++++-----
 7 files changed, 46 insertions(+), 33 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index d33efdb..4509c28 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -170,12 +170,14 @@ public class SQLClientKafkaITCase extends TestLogger {
 
        private void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines) throws IOException {
                LOG.info("Executing Kafka {} end-to-end SQL statements.", 
kafkaSQLVersion);
-               clusterController.submitSQLJob(new 
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
-                       .addJar(sqlAvroJar)
-                       .addJars(apacheAvroJars)
-                       .addJar(sqlConnectorKafkaJar)
-                       .addJar(sqlToolBoxJar)
-                       .build());
+               clusterController.submitSQLJob(
+                       new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                               .addJar(sqlAvroJar)
+                               .addJars(apacheAvroJars)
+                               .addJar(sqlConnectorKafkaJar)
+                               .addJar(sqlToolBoxJar)
+                               .build(),
+                       Duration.ofMinutes(2L));
        }
 
        private List<String> initializeSqlLines(Map<String, String> vars) 
throws IOException {
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index a52414c..6c149d7 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -97,17 +98,19 @@ public class StreamingKafkaITCase extends TestLogger {
                        kafka.createTopic(1, 1, outputTopic);
 
                        // run the Flink job (detached mode)
-                       clusterController.submitJob(new 
JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
-                               .setDetached(true)
-                               .addArgument("--input-topic", inputTopic)
-                               .addArgument("--output-topic", outputTopic)
-                               .addArgument("--prefix", "PREFIX")
-                               .addArgument("--bootstrap.servers", 
kafka.getBootstrapServerAddresses().stream().map(address -> 
address.getHostString() + ':' + 
address.getPort()).collect(Collectors.joining(",")))
-                               .addArgument("--group.id", "myconsumer")
-                               .addArgument("--auto.offset.reset", "earliest")
-                               .addArgument("--transaction.timeout.ms", 
"900000")
-                               
.addArgument("--flink.partition-discovery.interval-millis", "1000")
-                               .build());
+                       clusterController.submitJob(
+                               new 
JobSubmission.JobSubmissionBuilder(kafkaExampleJar)
+                                       .setDetached(true)
+                                       .addArgument("--input-topic", 
inputTopic)
+                                       .addArgument("--output-topic", 
outputTopic)
+                                       .addArgument("--prefix", "PREFIX")
+                                       .addArgument("--bootstrap.servers", 
kafka.getBootstrapServerAddresses().stream().map(address -> 
address.getHostString() + ':' + 
address.getPort()).collect(Collectors.joining(",")))
+                                       .addArgument("--group.id", "myconsumer")
+                                       .addArgument("--auto.offset.reset", 
"earliest")
+                                       
.addArgument("--transaction.timeout.ms", "900000")
+                                       
.addArgument("--flink.partition-discovery.interval-millis", "1000")
+                                       .build(),
+                               Duration.ofMinutes(2L));
 
                        LOG.info("Sending messages to Kafka topic [{}] ...", 
inputTopic);
                        // send some data to Kafka
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
index 9f86306..fe7dbf4 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/ClusterController.java
@@ -21,6 +21,7 @@ package org.apache.flink.tests.util.flink;
 import org.apache.flink.util.AutoCloseableAsync;
 
 import java.io.IOException;
+import java.time.Duration;
 
 /**
  * Controller for interacting with a cluster.
@@ -31,16 +32,18 @@ public interface ClusterController extends 
AutoCloseableAsync {
         * Submits the given job to the cluster.
         *
         * @param job job to submit
+        * @param timeout the maximum time to wait.
         * @return JobController for the submitted job
         * @throws IOException
         */
-       JobController submitJob(JobSubmission job) throws IOException;
+       JobController submitJob(JobSubmission job, Duration timeout) throws 
IOException;
 
        /**
         * Submits the given SQL job to the cluster.
         *
         * @param job job to submit.
+        * @param timeout the maximum time to wait.
         * @throws IOException if any IO error happen.
         */
-       void submitSQLJob(SQLJobSubmission job) throws IOException;
+       void submitSQLJob(SQLJobSubmission job, Duration timeout) throws 
IOException;
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 07c910e..76f7213 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -45,6 +45,7 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -151,7 +152,7 @@ final class FlinkDistribution {
                
AutoClosableProcess.runBlocking(bin.resolve("stop-cluster.sh").toAbsolutePath().toString());
        }
 
-       public JobID submitJob(final JobSubmission jobSubmission) throws 
IOException {
+       public JobID submitJob(final JobSubmission jobSubmission, Duration 
timeout) throws IOException {
                final List<String> commands = new ArrayList<>(4);
                commands.add(bin.resolve("flink").toString());
                commands.add("run");
@@ -190,14 +191,14 @@ final class FlinkDistribution {
                        }
 
                        try {
-                               return 
JobID.fromHexString(rawJobIdFuture.get(1, TimeUnit.MINUTES));
+                               return 
JobID.fromHexString(rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
                        } catch (Exception e) {
                                throw new IOException("Could not determine Job 
ID.", e);
                        }
                }
        }
 
-       public void submitSQLJob(SQLJobSubmission job) throws IOException {
+       public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws 
IOException {
                final List<String> commands = new ArrayList<>();
                
commands.add(bin.resolve("sql-client.sh").toAbsolutePath().toString());
                commands.add("embedded");
@@ -218,7 +219,7 @@ final class FlinkDistribution {
                        .create(commands.toArray(new String[0]))
                        .setStdInputs(job.getSqlLines().toArray(new String[0]))
                        .setStdoutProcessor(LOG::info) // logging the SQL 
statements and error message
-                       .runBlocking();
+                       .runBlocking(timeout);
        }
 
        public void performJarOperation(JarOperation operation) throws 
IOException {
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
index 0e285f2..6e1506a 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
@@ -22,6 +22,7 @@ import org.apache.flink.tests.util.util.FactoryUtils;
 import org.apache.flink.util.ExternalResource;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -38,7 +39,7 @@ public interface FlinkResource extends ExternalResource {
         * <p>The exact constellation of the cluster is undefined.
         *
         * <p>In the case of per-job clusters this method may not start any 
Flink processes, deferring this to
-        * {@link ClusterController#submitJob(JobSubmission)}.
+        * {@link ClusterController#submitJob(JobSubmission, Duration)}.
         *
         * @return controller for interacting with the cluster
         * @throws IOException
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 55ef08c..82ba5ff 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -185,15 +186,15 @@ public class LocalStandaloneFlinkResource implements 
FlinkResource {
                }
 
                @Override
-               public JobController submitJob(JobSubmission job) throws 
IOException {
-                       final JobID run = distribution.submitJob(job);
+               public JobController submitJob(JobSubmission job, Duration 
timeout) throws IOException {
+                       final JobID run = distribution.submitJob(job, timeout);
 
                        return new StandaloneJobController(run);
                }
 
                @Override
-               public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
-                       distribution.submitSQLJob(job);
+               public void submitSQLJob(SQLJobSubmission job, Duration 
timeout) throws IOException {
+                       distribution.submitSQLJob(job, timeout);
                }
 
                @Override
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index ca603f4..1bd51b3 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -212,10 +212,12 @@ public class SQLClientHBaseITCase extends TestLogger {
 
        private void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines) throws IOException {
                LOG.info("Executing SQL: HBase source table -> HBase sink 
table");
-               clusterController.submitSQLJob(new 
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
-                       .addJar(sqlToolBoxJar)
-                       .addJar(sqlConnectorHBaseJar)
-                       .addJars(hadoopClasspathJars)
-                       .build());
+               clusterController.submitSQLJob(
+                       new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                               .addJar(sqlToolBoxJar)
+                               .addJar(sqlConnectorHBaseJar)
+                               .addJars(hadoopClasspathJars)
+                               .build(),
+                       Duration.ofMinutes(2L));
        }
 }

Reply via email to