[
https://issues.apache.org/jira/browse/BEAM-9345?focusedWorklogId=392039&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392039
]
ASF GitHub Bot logged work on BEAM-9345:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Feb/20 19:56
Start Date: 24/Feb/20 19:56
Worklog Time Spent: 10m
Work Description: ibzib commented on pull request #10950: [BEAM-9345] Add
end-to-end Flink job submission test
URL: https://github.com/apache/beam/pull/10950#discussion_r383479578
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.file.Files;
+import java.security.Permission;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.construction.resources.PipelineResources;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+/** End-to-end submission test of Beam jobs on a Flink cluster. */
+public class FlinkSubmissionTest {
+
+ @ClassRule public static final TemporaryFolder TEMP_FOLDER = new
TemporaryFolder();
+ private static final Map<String, String> ENV = System.getenv();
+ private static final SecurityManager SECURITY_MANAGER =
System.getSecurityManager();
+
+ /** Flink cluster that runs over the lifespan of the tests. */
+ private static transient RemoteMiniCluster flinkCluster;
+
+ /** Each test has a timeout of 60 seconds (for safety). */
+ @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
+
+ /** Counter which keeps track of the number of jobs submitted. */
+ private static int expectedNumberOfJobs;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration config = new Configuration();
+ // Avoid port collision in parallel tests on the same machine
+ config.setInteger(RestOptions.PORT.key(), 0);
+
+ MiniClusterConfiguration clusterConfig =
+ new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(1)
+ // Create a shared actor system for all cluster services
+ .setRpcServiceSharing(RpcServiceSharing.SHARED)
+ .build();
+
+ flinkCluster = new RemoteMiniClusterImpl(clusterConfig);
+ flinkCluster.start();
+ prepareEnvironment();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ restoreEnvironment();
+ flinkCluster.close();
+ flinkCluster = null;
+ }
+
+ @Test
+ public void testSubmission() throws Exception {
+ runSubmission(false);
+ }
+
+ @Test
+ public void testDetachedSubmission() throws Exception {
+ runSubmission(true);
+ }
+
+ private void runSubmission(boolean detached) throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setTempLocation(TEMP_FOLDER.getRoot().getPath());
+ String jarPath =
+ Iterables.getFirst(
+
PipelineResources.detectClassPathResourcesToStage(getClass().getClassLoader(),
options),
+ null);
+
+ try {
+ throwExceptionOnSystemExit();
+ ImmutableList.Builder<String> argsBuilder = ImmutableList.builder();
+ argsBuilder.add("run").add("-c").add(getClass().getName());
+ if (detached) {
+ argsBuilder.add("-d");
+ }
+ argsBuilder.add(jarPath);
+
+ expectedNumberOfJobs++;
+ // Run end-to-end test
+ CliFrontend.main(argsBuilder.build().toArray(new String[0]));
+ } catch (SystemExitException e) {
+ // The Clifrontend exited and we can move on to check if the job has
finished
+ } finally {
+ restoreDefaultSystemExitBehavior();
+ }
+
+ waitUntilJobIsCompleted();
+ }
+
+ /** The Flink program which is executed by the CliFrontend. */
+ public static void main(String[] args) {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setParallelism(1);
+ Pipeline p = Pipeline.create(options);
+ p.apply(GenerateSequence.from(0).to(1));
+ p.run();
+ }
+
+ private static void prepareEnvironment() throws Exception {
+ // Write a Flink config
+ File file = TEMP_FOLDER.newFile("flink-conf.yaml");
+ String config =
+ String.format(
+ "%s: %s\n%s: %s\n%s: %s",
+ JobManagerOptions.ADDRESS.key(),
+ "localhost",
+ JobManagerOptions.PORT.key(),
+ flinkCluster.getClusterPort(),
+ RestOptions.PORT.key(),
+ flinkCluster.getRestPort());
+ Files.write(file.toPath(), config.getBytes(Charsets.UTF_8));
+
+ // Create a new environment with the location of the Flink config for
Clifrontend
+ ImmutableMap<String, String> newEnv =
+ ImmutableMap.<String, String>builder()
+ .putAll(ENV.entrySet())
+ .put(ConfigConstants.ENV_FLINK_CONF_DIR, file.getParent())
+ .build();
+
+ modifyEnv(newEnv);
+ }
+
+ private static void restoreEnvironment() throws Exception {
+ modifyEnv(ENV);
+ }
+
+ private static void modifyEnv(Map<String, String> env) throws Exception {
+ Class processEnv = Class.forName("java.lang.ProcessEnvironment");
+ Field envField = processEnv.getDeclaredField("theUnmodifiableEnvironment");
+
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(envField, envField.getModifiers() & ~Modifier.FINAL);
+
+ envField.setAccessible(true);
+ envField.set(null, env);
+ envField.setAccessible(false);
+
+ modifiersField.setInt(envField, envField.getModifiers() & Modifier.FINAL);
+ modifiersField.setAccessible(false);
+ }
+
+ private void waitUntilJobIsCompleted() throws Exception {
+ while (true) {
+ Collection<JobStatusMessage> allJobsStates =
flinkCluster.listJobs().get();
+ assertThat(
+ "There should be a job per test run.", allJobsStates.size(),
is(expectedNumberOfJobs));
+ if (allJobsStates.stream()
+ .allMatch(jobStatus -> jobStatus.getJobState() ==
JobStatus.FINISHED)) {
+ return;
+ }
+ Thread.sleep(50);
+ }
+ }
+
+ /** Prevents the CliFrontend from calling System.exit. */
+ private static void throwExceptionOnSystemExit() {
+ System.setSecurityManager(
+ new SecurityManager() {
Review comment:
Instead of creating an all-new SecurityManager, should we instead wrap the
original one? (Not sure if it actually matters.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 392039)
Time Spent: 20m (was: 10m)
> "Multiple environments cannot be created in detached mode"
> ----------------------------------------------------------
>
> Key: BEAM-9345
> URL: https://issues.apache.org/jira/browse/BEAM-9345
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Kyle Weaver
> Assignee: Maximilian Michels
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Workarounds.restoreOriginalStdOutAndStdErrIfApplicable throws exception when
> running in standalone session cluster. Since FLINK-15504 is resolved in Flink
> 1.10, maybe we can remove the workaround in Beam?
> From user@
> (https://lists.apache.org/thread.html/r1fb9456055ae51f998be67531f0ac55d3da9cf2647238b991eee7f97%40%3Cuser.beam.apache.org%3E):
> I am trying to upgrade from a Flink session cluster 1.8 to 1.9 and from Beam
> 2.16.0 to 2.19.0.
> Everything went quite smoothly, the local runner and the local Flink runner
> work flawlessly.
> However when I:
> 1. Generate a Beam jar for the FlinkRunner via maven (mvn package
> -PFlinkRunner)
> 2. Glue that into a Flink 1.9 docker image
> 3. Start the image as a Standalone Session Cluster
> When I try to launch the first pipeline I get the following exception
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to construct instance from factory method
> FlinkRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: java.lang.RuntimeException: Failed to construct instance from
> factory method FlinkRunner#fromOptions(interface
> org.apache.beam.sdk.options.PipelineOptions)
> at
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
> at
> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
> at
> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> ... 9 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
> ... 19 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Multiple
> environments cannot be created in detached mode
> at
> org.apache.flink.client.program.ContextEnvironmentFactory.createExecutionEnvironment(ContextEnvironmentFactory.java:67)
> at java.util.Optional.map(Optional.java:215)
> at
> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment(ExecutionEnvironment.java:1068)
> at
> org.apache.beam.runners.flink.translation.utils.Workarounds.restoreOriginalStdOutAndStdErrIfApplicable(Workarounds.java:43)
> at
> org.apache.beam.runners.flink.FlinkRunner.<init>(FlinkRunner.java:96)
> at
> org.apache.beam.runners.flink.FlinkRunner.fromOptions(FlinkRunner.java:90)
> ... 24 more
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)