This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eb1ee9986b9ecc282f9fca2bb0f95a1544759955
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Dec 14 16:02:03 2021 +0100

    [FLINK-25266][e2e] Support job jar submission with FlinkContainers
---
 .../util/flink/container/FlinkContainers.java      | 53 ++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
index 5d4fa8e..38c82ae 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.flink.container;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -29,6 +30,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWith
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.tests.util.flink.JobSubmission;
 import org.apache.flink.tests.util.flink.SQLJobSubmission;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -57,6 +59,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -258,6 +262,55 @@ public class FlinkContainers implements BeforeAllCallback, 
AfterAllCallback {
         }
     }
 
+    /**
+     * Submits the given job to the cluster.
+     *
+     * @param job job to submit
+     */
+    public JobID submitJob(JobSubmission job) throws IOException, 
InterruptedException {
+        final List<String> commands = new ArrayList<>();
+        commands.add("flink/bin/flink");
+        commands.add("run");
+
+        if (job.isDetached()) {
+            commands.add("-d");
+        }
+        if (job.getParallelism() > 0) {
+            commands.add("-p");
+            commands.add(String.valueOf(job.getParallelism()));
+        }
+        job.getMainClass()
+                .ifPresent(
+                        mainClass -> {
+                            commands.add("--class");
+                            commands.add(mainClass);
+                        });
+        final Path jobJar = job.getJar();
+        final String containerPath = "/tmp/" + jobJar.getFileName();
+        commands.add(containerPath);
+        jobManager.copyFileToContainer(
+                MountableFile.forHostPath(jobJar.toAbsolutePath()), 
containerPath);
+        commands.addAll(job.getArguments());
+
+        LOG.info("Running {}.", commands.stream().collect(Collectors.joining(" 
")));
+
+        // Execute command in JobManager
+        Container.ExecResult execResult =
+                jobManager.execInContainer("bash", "-c", String.join(" ", 
commands));
+
+        final Pattern pattern =
+                job.isDetached()
+                        ? Pattern.compile("Job has been submitted with JobID 
(.*)")
+                        : Pattern.compile("Job with JobID (.*) has finished.");
+
+        final String stdout = execResult.getStdout();
+        LOG.info(stdout);
+        LOG.error(execResult.getStderr());
+        final Matcher matcher = pattern.matcher(stdout);
+        checkState(matcher.find(), "Cannot extract JobID from stdout.");
+        return JobID.fromHexString(matcher.group(1));
+    }
+
     // ------------------------ JUnit 5 lifecycle management 
------------------------
     @Override
     public void beforeAll(ExtensionContext context) throws Exception {

Reply via email to