This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit eb1ee9986b9ecc282f9fca2bb0f95a1544759955 Author: Fabian Paul <[email protected]> AuthorDate: Tue Dec 14 16:02:03 2021 +0100 [FLINK-25266][e2e] Support job jar submission with FlinkContainers --- .../util/flink/container/FlinkContainers.java | 53 ++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java index 5d4fa8e..38c82ae 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.flink.container; +import org.apache.flink.api.common.JobID; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWith import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.tests.util.flink.JobSubmission; import org.apache.flink.tests.util.flink.SQLJobSubmission; import org.apache.flink.util.function.RunnableWithException; @@ -57,6 +59,8 @@ import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -258,6 +262,55 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { } } + /** + * Submits the given job to the cluster. + * + * @param job job to submit + */ + public JobID submitJob(JobSubmission job) throws IOException, InterruptedException { + final List<String> commands = new ArrayList<>(); + commands.add("flink/bin/flink"); + commands.add("run"); + + if (job.isDetached()) { + commands.add("-d"); + } + if (job.getParallelism() > 0) { + commands.add("-p"); + commands.add(String.valueOf(job.getParallelism())); + } + job.getMainClass() + .ifPresent( + mainClass -> { + commands.add("--class"); + commands.add(mainClass); + }); + final Path jobJar = job.getJar(); + final String containerPath = "/tmp/" + jobJar.getFileName(); + commands.add(containerPath); + jobManager.copyFileToContainer( + MountableFile.forHostPath(jobJar.toAbsolutePath()), containerPath); + commands.addAll(job.getArguments()); + + LOG.info("Running {}.", commands.stream().collect(Collectors.joining(" "))); + + // Execute command in JobManager + Container.ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + + final Pattern pattern = + job.isDetached() + ? Pattern.compile("Job has been submitted with JobID (.*)") + : Pattern.compile("Job with JobID (.*) has finished."); + + final String stdout = execResult.getStdout(); + LOG.info(stdout); + LOG.error(execResult.getStderr()); + final Matcher matcher = pattern.matcher(stdout); + checkState(matcher.find(), "Cannot extract JobID from stdout."); + return JobID.fromHexString(matcher.group(1)); + } + // ------------------------ JUnit 5 lifecycle management ------------------------ @Override public void beforeAll(ExtensionContext context) throws Exception {
