[
https://issues.apache.org/jira/browse/BEAM-4791?focusedWorklogId=124661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-124661
]
ASF GitHub Bot logged work on BEAM-4791:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jul/18 22:15
Start Date: 18/Jul/18 22:15
Worklog Time Spent: 10m
Work Description: tweise closed pull request #5957: [BEAM-4791] Portable
Flink runner integration test.
URL: https://github.com/apache/beam/pull/5957
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index b3fe63b7874..0a80ad4e04d 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -35,6 +35,7 @@ evaluationDependsOn(":beam-runners-core-java")
test {
systemProperty "log4j.configuration", "log4j-test.properties"
+ //systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
jvmArgs "-XX:-UseGCOverheadLimit"
if (System.getProperty("beamSurefireArgline")) {
jvmArgs System.getProperty("beamSurefireArgline")
@@ -79,6 +80,7 @@ dependencies {
shadowTest "org.apache.flink:flink-runtime_2.11:$flink_version:tests"
shadowTest "org.apache.flink:flink-streaming-java_2.11:$flink_version:tests"
shadowTest "org.apache.flink:flink-test-utils_2.11:$flink_version"
+ shadowTest project(":beam-sdks-java-harness")
validatesRunner project(path: ":beam-sdks-java-core", configuration:
"shadowTest")
validatesRunner project(path: ":beam-runners-core-java", configuration:
"shadowTest")
validatesRunner project(path: project.path, configuration: "shadow")
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
new file mode 100644
index 00000000000..19099f26606
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source;
+import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
+import
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests the execution of a pipeline from specification to execution on the
portable Flink runner.
+ * Exercises job invocation, executable stage translation and deployment with
embedded Flink for
+ * batch and streaming.
+ */
+@RunWith(Parameterized.class)
+public class PortableExecutionTest implements Serializable {
+
+ @Parameters
+ public static Object[] data() {
+ return new Object[] {true, false};
+ }
+
+ @Parameter public boolean isStreaming;
+
+ private transient ListeningExecutorService flinkJobExecutor;
+
+ private DockerJobBundleFactory createJobBundleFactory(JobInfo jobInfo)
throws Exception {
+ return new DockerJobBundleFactory(jobInfo) {
+
+ @Override
+ protected ServerFactory getServerFactory() {
+ return InProcessServerFactory.create();
+ }
+
+ @Override
+ protected EnvironmentFactory getEnvironmentFactory(
+ GrpcFnServer<FnApiControlClientPoolService> controlServer,
+ GrpcFnServer<GrpcLoggingService> loggingServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ Source clientSource,
+ IdGenerator idGenerator) {
+ return InProcessEnvironmentFactory.create(
+ PipelineOptionsFactory.create(), loggingServer, controlServer,
clientSource);
+ }
+ };
+ }
+
+ @Before
+ public void setup() {
+ DockerJobBundleFactory.FACTORY.set(this::createJobBundleFactory);
+ flinkJobExecutor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ }
+
+ @After
+ public void tearDown() {
+ flinkJobExecutor.shutdown();
+ }
+
+ private static ArrayList<KV<String, Iterable<Long>>> outputValues = new
ArrayList<>();
+
+ @Test
+ public void testExecution() throws Exception {
+ Pipeline p = Pipeline.create();
+ p.apply("impulse", Impulse.create())
+ .apply(
+ "create",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @ProcessElement
+ public void process(ProcessContext ctxt) {
+ ctxt.output("zero");
+ ctxt.output("one");
+ ctxt.output("two");
+ }
+ }))
+ .apply(
+ "len",
+ ParDo.of(
+ new DoFn<String, Long>() {
+ @ProcessElement
+ public void process(ProcessContext ctxt) {
+ ctxt.output((long) ctxt.element().length());
+ }
+ }))
+ .apply("addKeys", WithKeys.of("foo"))
+ // Use some unknown coders
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
+ // Force the output to be materialized
+ .apply("gbk", GroupByKey.create())
+ .apply(
+ "collect",
+ ParDo.of(
+ new DoFn<KV<String, Iterable<Long>>, Void>() {
+ @ProcessElement
+ public void process(ProcessContext ctx) {
+ outputValues.add(ctx.element());
+ }
+ }));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+
+ outputValues.clear();
+ // execute the pipeline
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setFlinkMaster("[local]");
+ options.setStreaming(isStreaming);
+ FlinkJobInvocation jobInvocation =
+ FlinkJobInvocation.create(
+ "fakeId",
+ "fakeRetrievalToken",
+ flinkJobExecutor,
+ pipelineProto,
+ options,
+ Collections.EMPTY_LIST);
+ jobInvocation.start();
+ long timeout = System.currentTimeMillis() + 30 * 1000;
+ while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis()
< timeout) {
+ Thread.sleep(1000);
+ }
+ assertEquals("job state", Enum.DONE, jobInvocation.getState());
+
+ assertEquals(1, outputValues.size());
+ assertEquals("foo", outputValues.get(0).getKey());
+ assertThat(outputValues.get(0).getValue(), containsInAnyOrder(4L, 3L, 3L));
+ }
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index 99c2753786a..7168cd23d6b 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -29,9 +29,9 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
-import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
@@ -44,6 +44,7 @@
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
@@ -69,6 +70,21 @@
public class DockerJobBundleFactory implements JobBundleFactory {
private static final Logger LOG =
LoggerFactory.getLogger(DockerJobBundleFactory.class);
+ /** Factory that creates {@link JobBundleFactory} for the given {@link
JobInfo}. */
+ public interface JobBundleFactoryFactory {
+ DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
+ }
+ // TODO: a hacky way to override the factory for testing.
+ // Should be replaced with mechanism that let's users configure their own
factory
+ public static final AtomicReference<JobBundleFactoryFactory> FACTORY =
+ new AtomicReference(
+ new JobBundleFactoryFactory() {
+ @Override
+ public DockerJobBundleFactory create(JobInfo jobInfo) throws
Exception {
+ return new DockerJobBundleFactory(jobInfo);
+ }
+ });
+
// TODO: This host name seems to change with every other Docker release. Do
we attempt to keep up
// or attempt to document the supported Docker version(s)?
private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
@@ -82,6 +98,10 @@
private final LoadingCache<Environment, WrappedSdkHarnessClient>
environmentCache;
public static DockerJobBundleFactory create(JobInfo jobInfo) throws
Exception {
+ return FACTORY.get().create(jobInfo);
+ }
+
+ protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
ServerFactory serverFactory = getServerFactory();
IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
ControlClientPool clientPool = MapControlClientPool.create();
@@ -100,27 +120,25 @@ public static DockerJobBundleFactory create(JobInfo
jobInfo) throws Exception {
GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()),
serverFactory);
- DockerEnvironmentFactory environmentFactory =
- DockerEnvironmentFactory.forServices(
+ EnvironmentFactory environmentFactory =
+ getEnvironmentFactory(
controlServer,
loggingServer,
retrievalServer,
provisioningServer,
clientPool.getSource(),
IdGenerators.incrementingLongs());
- return new DockerJobBundleFactory(
- environmentFactory,
- serverFactory,
- stageIdGenerator,
- controlServer,
- loggingServer,
- retrievalServer,
- provisioningServer);
+ this.stageIdGenerator = stageIdGenerator;
+ this.controlServer = controlServer;
+ this.loggingServer = loggingServer;
+ this.retrievalServer = retrievalServer;
+ this.provisioningServer = provisioningServer;
+ this.environmentCache = createEnvironmentCache(environmentFactory,
serverFactory);
}
@VisibleForTesting
DockerJobBundleFactory(
- DockerEnvironmentFactory environmentFactory,
+ EnvironmentFactory environmentFactory,
ServerFactory serverFactory,
IdGenerator stageIdGenerator,
GrpcFnServer<FnApiControlClientPoolService> controlServer,
@@ -132,28 +150,31 @@ public static DockerJobBundleFactory create(JobInfo
jobInfo) throws Exception {
this.loggingServer = loggingServer;
this.retrievalServer = retrievalServer;
this.provisioningServer = provisioningServer;
- this.environmentCache =
- CacheBuilder.newBuilder()
- .removalListener(
- ((RemovalNotification<Environment, WrappedSdkHarnessClient>
notification) -> {
- LOG.debug("Cleaning up for environment {}",
notification.getKey().getUrl());
- try {
- notification.getValue().close();
- } catch (Exception e) {
- LOG.warn(
- String.format("Error cleaning up environment %s",
notification.getKey()),
- e);
- }
- }))
- .build(
- new CacheLoader<Environment, WrappedSdkHarnessClient>() {
- @Override
- public WrappedSdkHarnessClient load(Environment environment)
throws Exception {
- RemoteEnvironment remoteEnvironment =
- environmentFactory.createEnvironment(environment);
- return WrappedSdkHarnessClient.wrapping(remoteEnvironment,
serverFactory);
- }
- });
+ this.environmentCache = createEnvironmentCache(environmentFactory,
serverFactory);
+ }
+
+ private LoadingCache<Environment, WrappedSdkHarnessClient>
createEnvironmentCache(
+ EnvironmentFactory environmentFactory, ServerFactory serverFactory) {
+ return CacheBuilder.newBuilder()
+ .removalListener(
+ ((RemovalNotification<Environment, WrappedSdkHarnessClient>
notification) -> {
+ LOG.debug("Cleaning up for environment {}",
notification.getKey().getUrl());
+ try {
+ notification.getValue().close();
+ } catch (Exception e) {
+ LOG.warn(
+ String.format("Error cleaning up environment %s",
notification.getKey()), e);
+ }
+ }))
+ .build(
+ new CacheLoader<Environment, WrappedSdkHarnessClient>() {
+ @Override
+ public WrappedSdkHarnessClient load(Environment environment)
throws Exception {
+ RemoteEnvironment remoteEnvironment =
+ environmentFactory.createEnvironment(environment);
+ return WrappedSdkHarnessClient.wrapping(remoteEnvironment,
serverFactory);
+ }
+ });
}
@Override
@@ -187,7 +208,7 @@ public void close() throws Exception {
provisioningServer.close();
}
- private static ServerFactory getServerFactory() {
+ protected ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
return ServerFactory.createDefault();
@@ -355,12 +376,20 @@ public void close() throws Exception {
OTHER,
}
- // TODO: Remove this once a real artifact retrieval service has been wired
in.
- private static class UnimplementedArtifactRetrievalService
- extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
- implements ArtifactRetrievalService {
-
- @Override
- public void close() throws Exception {}
+ /** Create {@link EnvironmentFactory} for the given services. */
+ protected EnvironmentFactory getEnvironmentFactory(
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ ControlClientPool.Source clientSource,
+ IdGenerator idGenerator) {
+ return DockerEnvironmentFactory.forServices(
+ controlServiceServer,
+ loggingServiceServer,
+ retrievalServiceServer,
+ provisioningServiceServer,
+ clientSource,
+ idGenerator);
}
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
index 334f4ef29a9..3499c01d881 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
@@ -108,7 +108,7 @@ public RemoteEnvironment createEnvironment(Environment
container) throws Excepti
e);
throw e;
}
- return (Object) null;
+ return null;
});
executor.submit(
() -> {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 124661)
Time Spent: 5.5h (was: 5h 20m)
> Integration test for portable Flink runner basic batch/streaming execution
> --------------------------------------------------------------------------
>
> Key: BEAM-4791
> URL: https://issues.apache.org/jira/browse/BEAM-4791
> Project: Beam
> Issue Type: Task
> Components: runner-flink
> Reporter: Thomas Weise
> Assignee: Thomas Weise
> Priority: Major
> Labels: portability
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Run a simple pipeline in embedded mode to verify the basic workings of
> JobInvocation/translation/execution for both, batch and streaming. Although
> there will be some overlap with (future) ValidatesRunner, this test is meant
> to run as part of the unit tests.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)