[
https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172754
]
ASF GitHub Bot logged work on BEAM-6159:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Dec/18 18:13
Start Date: 06/Dec/18 18:13
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #7015: [BEAM-6159] Migrate
dataflow portable worker using shared library to process bundle
URL: https://github.com/apache/beam/pull/7015
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 3feb7064b895..1d1c56cb80f3 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -197,7 +197,7 @@ public void execute() throws Exception {
EnvironmentFactory environmentFactory =
createEnvironmentFactory(control, logging, artifact, provisioning,
controlClientPool);
JobBundleFactory jobBundleFactory =
- SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory,
data, state);
+ SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory,
data, state, null);
TransformEvaluatorRegistry transformRegistry =
TransformEvaluatorRegistry.portableRegistry(
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index f0f877283154..2dfaac966827 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -111,7 +111,7 @@ public void setup() throws Exception {
bundleFactory = ImmutableListBundleFactory.create();
JobBundleFactory jobBundleFactory =
SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory, dataServer, stateServer);
+ environmentFactory, dataServer, stateServer, null);
factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 1433702f714d..f0e8ddb73456 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -37,6 +37,7 @@
import
org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import
org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
+import
org.apache.beam.runners.dataflow.worker.graph.CreateExecutableStageNodeFunction;
import
org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction;
import
org.apache.beam.runners.dataflow.worker.graph.DeduceFlattenLocationsFunction;
import
org.apache.beam.runners.dataflow.worker.graph.DeduceNodeLocationsFunction;
@@ -218,18 +219,32 @@ protected BatchDataflowWorker(
// TODO: this conditional -> two implementations of common interface, or
// param/injection
if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
- Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage =
- pipeline == null
- ? RegisterNodeFunction.withoutPipeline(
- idGenerator,
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
- : RegisterNodeFunction.forPipeline(
- pipeline, idGenerator,
sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
+ Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>>
transformToRunnerNetwork;
+ Function<MutableNetwork<Node, Edge>, Node> sdkFusedStage;
Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>>
lengthPrefixUnknownCoders =
LengthPrefixUnknownCoders::forSdkNetwork;
- Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>>
transformToRunnerNetwork =
- new CreateRegisterFnOperationFunction(
- idGenerator, this::createPortNode,
lengthPrefixUnknownCoders.andThen(sdkFusedStage));
-
+ if (DataflowRunner.hasExperiment(options,
"use_executable_stage_bundle_execution")) {
+ sdkFusedStage = new CreateExecutableStageNodeFunction(pipeline,
idGenerator);
+ transformToRunnerNetwork =
+ new CreateRegisterFnOperationFunction(
+ idGenerator,
+ this::createPortNode,
+ lengthPrefixUnknownCoders.andThen(sdkFusedStage),
+ true);
+ } else {
+ sdkFusedStage =
+ pipeline == null
+ ? RegisterNodeFunction.withoutPipeline(
+ idGenerator,
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+ : RegisterNodeFunction.forPipeline(
+ pipeline, idGenerator,
sdkHarnessRegistry.beamFnStateApiServiceDescriptor());
+ transformToRunnerNetwork =
+ new CreateRegisterFnOperationFunction(
+ idGenerator,
+ this::createPortNode,
+ lengthPrefixUnknownCoders.andThen(sdkFusedStage),
+ false);
+ }
mapTaskToNetwork =
mapTaskToBaseNetwork
.andThen(new ReplacePgbkWithPrecombineFunction())
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index a76f326f33b4..7db53391fc57 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.graph.MutableNetwork;
import com.google.common.graph.Network;
import java.util.ArrayList;
@@ -40,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.ElementByteSizeObservable;
@@ -53,14 +55,15 @@
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor;
+import
org.apache.beam.runners.dataflow.worker.fn.control.ProcessRemoteBundleOperation;
import
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation;
-import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation;
import
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
import org.apache.beam.runners.dataflow.worker.graph.Networks;
import
org.apache.beam.runners.dataflow.worker.graph.Networks.TypeSafeNodeFunction;
+import org.apache.beam.runners.dataflow.worker.graph.Nodes.ExecutableStageNode;
import
org.apache.beam.runners.dataflow.worker.graph.Nodes.FetchAndFilterStreamingSideInputsNode;
import
org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
@@ -84,7 +87,12 @@
import
org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import
org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import
org.apache.beam.runners.fnexecution.environment.StaticRemoteEnvironmentFactory;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.sdk.coders.Coder;
@@ -118,7 +126,7 @@ private BeamFnMapTaskExecutorFactory() {}
@Override
public DataflowMapTaskExecutor create(
InstructionRequestHandler instructionRequestHandler,
- GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
+ GrpcFnServer<GrpcDataService> grpcDataFnServer,
Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
GrpcFnServer<GrpcStateService> grpcStateFnServer,
MutableNetwork<Node, Edge> network,
@@ -160,6 +168,41 @@ public DataflowMapTaskExecutor create(
// TODO: Set NameContext properly for these operations.
executionContext.createOperationContext(
NameContext.create(stageName, stageName, stageName,
stageName))));
+ if (DataflowRunner.hasExperiment(
+ options.as(DataflowPipelineDebugOptions.class),
"use_executable_stage_bundle_execution")) {
+ LOG.debug("Using SingleEnvironmentInstanceJobBundleFactory");
+ JobBundleFactory jobBundleFactory =
+ SingleEnvironmentInstanceJobBundleFactory.create(
+
StaticRemoteEnvironmentFactory.forService(instructionRequestHandler),
+ grpcDataFnServer,
+ grpcStateFnServer,
+ idGenerator);
+ // If the use_executable_stage_bundle_execution is enabled, use
ExecutableStage instead.
+ Networks.replaceDirectedNetworkNodes(
+ network,
+ createOperationTransformForExecutableStageNode(
+ network, stageName, executionContext, jobBundleFactory));
+ } else {
+ // Swap out all the RegisterFnRequest nodes with Operation nodes
+ Networks.replaceDirectedNetworkNodes(
+ network,
+ createOperationTransformForRegisterFnNodes(
+ idGenerator,
+ instructionRequestHandler,
+ grpcStateFnServer.getService(),
+ stageName,
+ executionContext));
+ // Swap out all the RemoteGrpcPort nodes with Operation nodes, note that
it is expected
+ // that the RegisterFnRequest nodes have already been replaced.
+ Networks.replaceDirectedNetworkNodes(
+ network,
+ createOperationTransformForGrpcPortNodes(
+ network,
+ grpcDataFnServer.getService(),
+ // TODO: Set NameContext properly for these operations.
+ executionContext.createOperationContext(
+ NameContext.create(stageName, stageName, stageName,
stageName))));
+ }
// Swap out all the FetchAndFilterStreamingSideInput nodes with operation
nodes
Networks.replaceDirectedNetworkNodes(
@@ -302,6 +345,36 @@ public Node typedApply(RemoteGrpcPortNode input) {
};
}
+ private Function<Node, Node> createOperationTransformForExecutableStageNode(
+ final Network<Node, Edge> network,
+ final String stageName,
+ final DataflowExecutionContext<?> executionContext,
+ final JobBundleFactory jobBundleFactory) {
+ return new
TypeSafeNodeFunction<ExecutableStageNode>(ExecutableStageNode.class) {
+ @Override
+ public Node typedApply(ExecutableStageNode input) {
+ StageBundleFactory stageBundleFactory =
+ jobBundleFactory.forStage(input.getExecutableStage());
+ Iterable<OutputReceiverNode> outputReceiverNodes =
+ Iterables.filter(network.successors(input),
OutputReceiverNode.class);
+
+ OutputReceiver[] outputReceivers = new
OutputReceiver[Iterables.size(outputReceiverNodes)];
+ Lists.newArrayList(outputReceiverNodes)
+ .stream()
+ .map(outputReceiverNode -> outputReceiverNode.getOutputReceiver())
+ .collect(Collectors.toList())
+ .toArray(outputReceivers);
+
+ return OperationNode.create(
+ new ProcessRemoteBundleOperation(
+ executionContext.createOperationContext(
+ NameContext.create(stageName, stageName, stageName,
stageName)),
+ stageBundleFactory,
+ outputReceivers));
+ }
+ };
+ }
+
private Function<Node, Node> createOperationTransformForRegisterFnNodes(
final IdGenerator idGenerator,
final InstructionRequestHandler instructionRequestHandler,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
index 4e6e5a11eb08..0873ab17ebce 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
@@ -21,11 +21,11 @@
import com.google.common.graph.MutableNetwork;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -39,7 +39,7 @@
*/
DataflowMapTaskExecutor create(
InstructionRequestHandler instructionRequestHandler,
- GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
+ GrpcFnServer<GrpcDataService> grpcDataFnServer,
Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
GrpcFnServer<GrpcStateService> grpcStateFnServer,
MutableNetwork<Node, Edge> network,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 99eee23c6ce4..4ba66ba56ceb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -46,7 +46,6 @@
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import
org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService.DataService;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
import org.apache.beam.runners.dataflow.worker.graph.Networks;
@@ -71,6 +70,7 @@
import
org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -99,7 +99,7 @@ private IntrinsicMapTaskExecutorFactory() {}
@Override
public DataflowMapTaskExecutor create(
InstructionRequestHandler instructionRequestHandler,
- GrpcFnServer<DataService> grpcDataFnServer,
+ GrpcFnServer<GrpcDataService> grpcDataFnServer,
Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
GrpcFnServer<GrpcStateService> grpcStateFnServer,
MutableNetwork<Node, Edge> network,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
index a0d54eada51b..0f982164983f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
@@ -28,6 +28,7 @@
import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,7 +188,7 @@ public String getWorkerId() {
@Override
@Nullable
- public GrpcFnServer<BeamFnDataGrpcService.DataService>
getGrpcDataFnServer() {
+ public GrpcFnServer<GrpcDataService> getGrpcDataFnServer() {
return GrpcFnServer.create(
beamFnDataGrpcService.getDataService(getWorkerId()),
beamFnDataApiServiceDescriptor());
}
@@ -229,7 +230,7 @@ public String getWorkerId() {
@Nullable
@Override
- public GrpcFnServer<BeamFnDataGrpcService.DataService>
getGrpcDataFnServer() {
+ public GrpcFnServer<GrpcDataService> getGrpcDataFnServer() {
return null;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
index 383d292df7c9..e25c87bc7cf6 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
@@ -19,9 +19,9 @@
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
/** Registry used to manage all the connections (Control, Data, State) from
SdkHarness */
@@ -61,7 +61,7 @@
public String getWorkerId();
@Nullable
- public GrpcFnServer<BeamFnDataGrpcService.DataService>
getGrpcDataFnServer();
+ public GrpcFnServer<GrpcDataService> getGrpcDataFnServer();
@Nullable
public GrpcFnServer<GrpcStateService> getGrpcStateFnServer();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d75c70327c3b..d6de9071d743 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -641,7 +641,10 @@ public void run() {
Function<MutableNetwork<Node, Edge>, MutableNetwork<Node, Edge>>
transformToRunnerNetwork =
new CreateRegisterFnOperationFunction(
- idGenerator, this::createPortNode,
lengthPrefixUnknownCoders.andThen(sdkFusedStage));
+ idGenerator,
+ this::createPortNode,
+ lengthPrefixUnknownCoders.andThen(sdkFusedStage),
+ false);
mapTaskToNetwork =
mapTaskToBaseNetwork
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
new file mode 100644
index 000000000000..3d44331dd5e5
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link
org.apache.beam.runners.dataflow.worker.util.common.worker.Operation} is
responsible
+ * for communicating with the SDK harness and asking it to process a bundle of
work. This operation
+ * request a RemoteBundle{@link
org.apache.beam.runners.fnexecution.control.RemoteBundle}, send data
+ * elements to SDK and receive processed results from SDK, then pass these
elements to next
+ * Operations.
+ */
+public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
+ private static final Logger LOG =
LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
+ private final StageBundleFactory stageBundleFactory;
+ private final OutputReceiverFactory receiverFactory =
+ new OutputReceiverFactory() {
+ @Override
+ public FnDataReceiver<?> create(String pCollectionId) {
+ return receivedElement -> {
+ for (OutputReceiver receiver : receivers) {
+ LOG.debug("Consume element {}", receivedElement);
+ receiver.process((WindowedValue<?>) receivedElement);
+ }
+ };
+ }
+ };
+ private final StateRequestHandler stateRequestHandler;
+ private final BundleProgressHandler progressHandler;
+ private RemoteBundle remoteBundle;
+
+ public ProcessRemoteBundleOperation(
+ OperationContext context, StageBundleFactory stageBundleFactory,
OutputReceiver[] receivers) {
+ super(receivers, context);
+ this.stageBundleFactory = stageBundleFactory;
+ stateRequestHandler = StateRequestHandler.unsupported();
+ progressHandler = BundleProgressHandler.ignored();
+ }
+
+ @Override
+ public void start() throws Exception {
+ try (Closeable scope = context.enterStart()) {
+ super.start();
+ try {
+ remoteBundle =
+ stageBundleFactory.getBundle(receiverFactory, stateRequestHandler,
progressHandler);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start remote bundle", e);
+ }
+ }
+ }
+
+ @Override
+ public void process(Object inputElement) throws Exception {
+ LOG.debug(String.format("Sending value: %s", inputElement));
+ try (Closeable scope = context.enterProcess()) {
+ Iterables.getOnlyElement(remoteBundle.getInputReceivers().values())
+ .accept((WindowedValue<InputT>) inputElement);
+ }
+ }
+
+ @Override
+ public void finish() throws Exception {
+ try (Closeable scope = context.enterFinish()) {
+ try {
+ // close blocks until all results are received
+ remoteBundle.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to finish remote bundle", e);
+ }
+ }
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
index d81b902be88c..0081f7ccf524 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
@@ -32,9 +32,8 @@
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.dataflow.worker.fn.grpc.BeamFnService;
-import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.HeaderAccessor;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
@@ -205,55 +204,46 @@ public void fail(Throwable t) {
}
}
- // A wrapper class
- public class DataService extends BeamFnDataGrpc.BeamFnDataImplBase
- implements FnDataService, FnService {
- private final String clientId;
+ /** Get the anonymous subclass of GrpcDataService for the clientId */
+ public GrpcDataService getDataService(final String clientId) {
+ return new GrpcDataService() {
+ @Override
+ public <T> InboundDataClient receive(
+ LogicalEndpoint inputLocation,
+ Coder<WindowedValue<T>> coder,
+ FnDataReceiver<WindowedValue<T>> consumer) {
+ LOG.debug("Registering consumer for {}", inputLocation);
- public DataService(String clientId) {
- this.clientId = clientId;
- }
-
- @Override
- public <T> InboundDataClient receive(
- LogicalEndpoint inputLocation,
- Coder<WindowedValue<T>> coder,
- FnDataReceiver<WindowedValue<T>> consumer) {
- LOG.debug("Registering consumer for {}", inputLocation);
-
- return new DeferredInboundDataClient(this.clientId, inputLocation,
coder, consumer);
- }
+ return new DeferredInboundDataClient(clientId, inputLocation, coder,
consumer);
+ }
- @Override
- public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
- LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
- LOG.debug("Creating output consumer for {}", outputLocation);
- try {
- if (outboundBufferLimit.isPresent()) {
- return
BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
- outboundBufferLimit.get(),
- outputLocation,
- coder,
- getClientFuture(this.clientId).get().getOutboundObserver());
- } else {
- return BeamFnDataBufferingOutboundObserver.forLocation(
- outputLocation, coder,
getClientFuture(this.clientId).get().getOutboundObserver());
+ @Override
+ public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
+ LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
+ LOG.debug("Creating output consumer for {}", outputLocation);
+ try {
+ if (outboundBufferLimit.isPresent()) {
+ return
BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
+ outboundBufferLimit.get(),
+ outputLocation,
+ coder,
+ getClientFuture(clientId).get().getOutboundObserver());
+ } else {
+ return BeamFnDataBufferingOutboundObserver.forLocation(
+ outputLocation, coder,
getClientFuture(clientId).get().getOutboundObserver());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
}
- }
-
- @Override
- public void close() throws Exception {}
- }
- /** Get the DataService for the clientId */
- public DataService getDataService(final String clientId) {
- return new DataService(clientId);
+ /** It is intended to do nothing in close. */
+ @Override
+ public void close() throws Exception {}
+ };
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
new file mode 100644
index 000000000000..ab8cd0b8596a
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.graph;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.dataflow.util.Structs.getBytes;
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.dataflow.model.InstructionOutput;
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.MultiOutputInfo;
+import com.google.api.services.dataflow.model.ParDoInstruction;
+import com.google.api.services.dataflow.model.ParallelInstruction;
+import com.google.api.services.dataflow.model.ReadInstruction;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.graph.MutableNetwork;
+import com.google.common.graph.Network;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.CombinePhase;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.runners.dataflow.worker.graph.Edges.DefaultEdge;
+import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
+import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
+import org.apache.beam.runners.dataflow.worker.graph.Nodes.ExecutableStageNode;
+import
org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
+import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
+import
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.runners.dataflow.worker.graph.Nodes.RemoteGrpcPortNode;
+import org.apache.beam.runners.dataflow.worker.util.CloudSourceUtils;
+import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Converts a {@link Network} representation of {@link MapTask} destined for
the SDK harness into an
+ * {@link Node} containing {@link
org.apache.beam.runners.core.construction.graph.ExecutableStage}.
+ */
+public class CreateExecutableStageNodeFunction
+ implements Function<MutableNetwork<Node, Edge>, Node> {
+ private static final String DATA_INPUT_URN =
"urn:org.apache.beam:source:runner:0.1";
+
+ private static final String DATA_OUTPUT_URN =
"urn:org.apache.beam:sink:runner:0.1";
+ private static final String JAVA_SOURCE_URN =
"urn:org.apache.beam:source:java:0.1";
+
+ public static final String COMBINE_PER_KEY_URN =
+ BeamUrns.getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY);
+ public static final String COMBINE_PRECOMBINE_URN =
+
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE);
+ public static final String COMBINE_MERGE_URN =
+
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS);
+ public static final String COMBINE_EXTRACT_URN =
+
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS);
+ public static final String COMBINE_GROUPED_VALUES_URN =
+
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_GROUPED_VALUES);
+
+ private static final String SERIALIZED_SOURCE = "serialized_source";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final IdGenerator idGenerator;
+ private final @Nullable RunnerApi.Pipeline pipeline;
+
+ public CreateExecutableStageNodeFunction(RunnerApi.Pipeline pipeline,
IdGenerator idGenerator) {
+ this.pipeline = pipeline;
+ this.idGenerator = idGenerator;
+ }
+
+ @Override
+ public Node apply(MutableNetwork<Node, Edge> input) {
+ for (Node node : input.nodes()) {
+ if (node instanceof RemoteGrpcPortNode
+ || node instanceof ParallelInstructionNode
+ || node instanceof InstructionOutputNode) {
+ continue;
+ }
+ throw new IllegalArgumentException(
+ String.format("Network contains unknown type of node: %s", input));
+ }
+
+ // Fix all non output nodes to have named edges.
+ for (Node node : input.nodes()) {
+ if (node instanceof InstructionOutputNode) {
+ continue;
+ }
+ for (Node successor : input.successors(node)) {
+ for (Edge edge : input.edgesConnecting(node, successor)) {
+ if (edge instanceof DefaultEdge) {
+ input.removeEdge(edge);
+ input.addEdge(
+ node,
+ successor,
+ MultiOutputInfoEdge.create(new
MultiOutputInfo().setTag(idGenerator.getId())));
+ }
+ }
+ }
+ }
+
+ RunnerApi.Components.Builder componentsBuilder =
RunnerApi.Components.newBuilder();
+ componentsBuilder.mergeFrom(this.pipeline.getComponents());
+
+ // We start off by replacing all edges within the graph with edges that
have the named
+ // outputs from the predecessor step. For ParallelInstruction Source nodes
and RemoteGrpcPort
+ // nodes this is a generated port id. All ParDoInstructions will have
already
+
+ // For intermediate PCollections we fabricate, we make a bogus
WindowingStrategy
+ // TODO: create a correct windowing strategy, including coders and
environment
+ // An SdkFunctionSpec is invalid without a working environment reference.
We can revamp that
+ // when we inline SdkFunctionSpec and FunctionSpec, both slated for
inlining wherever they occur
+
+ // Default to use the Java environment if pipeline doesn't have
environment specified.
+ if (pipeline.getComponents().getEnvironmentsMap().isEmpty()) {
+ String envId = Environments.JAVA_SDK_HARNESS_ENVIRONMENT.getUrn() +
idGenerator.getId();
+ componentsBuilder.putEnvironments(envId,
Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
+ }
+
+ // Use default WindowingStrategy as the fake one.
+ // TODO: should get real WindowingStategy from pipeline proto.
+ String fakeWindowingStrategyId = "fakeWindowingStrategy" +
idGenerator.getId();
+ SdkComponents sdkComponents =
SdkComponents.create(pipeline.getComponents());
+ try {
+ RunnerApi.MessageWithComponents fakeWindowingStrategyProto =
+ WindowingStrategyTranslation.toMessageProto(
+ WindowingStrategy.globalDefault(), sdkComponents);
+ componentsBuilder.putWindowingStrategies(
+ fakeWindowingStrategyId,
fakeWindowingStrategyProto.getWindowingStrategy());
+
componentsBuilder.putAllCoders(fakeWindowingStrategyProto.getComponents().getCodersMap());
+ componentsBuilder.putAllEnvironments(
+ fakeWindowingStrategyProto.getComponents().getEnvironmentsMap());
+ } catch (IOException exc) {
+ throw new RuntimeException("Could not convert default windowing stratey
to proto", exc);
+ }
+
+ Map<Node, String> nodesToPCollections = new HashMap<>();
+ ImmutableMap.Builder<String, NameContext> ptransformIdToNameContexts =
ImmutableMap.builder();
+ // A field of ExecutableStage which includes the PCollection goes to
worker side.
+ Set<PCollectionNode> executableStageOutputs = new HashSet<>();
+ // A field of ExecutableStage which includes the PCollection goes to
runner side.
+ Set<PCollectionNode> executableStageInputs = new HashSet<>();
+
+ for (InstructionOutputNode node :
+ Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
+ InstructionOutput instructionOutput = node.getInstructionOutput();
+
+ String coderId = "generatedCoder" + idGenerator.getId();
+ try (ByteString.Output output = ByteString.newOutput()) {
+ try {
+ Coder<?> javaCoder =
+
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
+ Coder<?> elementCoder = ((WindowedValueCoder<?>)
javaCoder).getValueCoder();
+ sdkComponents.registerCoder(elementCoder);
+ RunnerApi.Coder coderProto = CoderTranslation.toProto(elementCoder,
sdkComponents);
+ componentsBuilder.putCoders(coderId, coderProto);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unable to encode coder %s for output %s",
+ instructionOutput.getCodec(), instructionOutput),
+ e);
+ } catch (Exception e) {
+ // Coder probably wasn't a java coder
+ OBJECT_MAPPER.writeValue(output, instructionOutput.getCodec());
+ componentsBuilder.putCoders(
+ coderId,
+ RunnerApi.Coder.newBuilder()
+ .setSpec(
+ RunnerApi.SdkFunctionSpec.newBuilder()
+ .setSpec(
+ RunnerApi.FunctionSpec.newBuilder()
+ .setPayload(output.toByteString())))
+ .build());
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unable to encode coder %s for output %s",
+ instructionOutput.getCodec(), instructionOutput),
+ e);
+ }
+
+ String pcollectionId = "generatedPcollection" + idGenerator.getId();
+ RunnerApi.PCollection pCollection =
+ RunnerApi.PCollection.newBuilder()
+ .setCoderId(coderId)
+ .setWindowingStrategyId(fakeWindowingStrategyId)
+ .build();
+ nodesToPCollections.put(node, pcollectionId);
+ componentsBuilder.putPcollections(pcollectionId, pCollection);
+
+ // Check whether this output collection has consumers from worker side
when "use_executable_stage_bundle_execution"
+ // is set
+ if
(input.successors(node).stream().anyMatch(RemoteGrpcPortNode.class::isInstance))
{
+ executableStageOutputs.add(PipelineNode.pCollection(pcollectionId,
pCollection));
+ }
+ if
(input.predecessors(node).stream().anyMatch(RemoteGrpcPortNode.class::isInstance))
{
+ executableStageInputs.add(PipelineNode.pCollection(pcollectionId,
pCollection));
+ }
+ }
+
+
componentsBuilder.putAllCoders(sdkComponents.toComponents().getCodersMap());
+ Set<PTransformNode> executableStageTransforms = new HashSet<>();
+
+ for (ParallelInstructionNode node :
+ Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
+ ParallelInstruction parallelInstruction = node.getParallelInstruction();
+ String ptransformId = "generatedPtransform" + idGenerator.getId();
+ ptransformIdToNameContexts.put(
+ ptransformId,
+ NameContext.create(
+ null,
+ parallelInstruction.getOriginalName(),
+ parallelInstruction.getSystemName(),
+ parallelInstruction.getName()));
+
+ RunnerApi.PTransform.Builder pTransform =
RunnerApi.PTransform.newBuilder();
+ RunnerApi.FunctionSpec.Builder transformSpec =
RunnerApi.FunctionSpec.newBuilder();
+
+ if (parallelInstruction.getParDo() != null) {
+ ParDoInstruction parDoInstruction = parallelInstruction.getParDo();
+ CloudObject userFnSpec =
CloudObject.fromSpec(parDoInstruction.getUserFn());
+ String userFnClassName = userFnSpec.getClassName();
+
+ if (userFnClassName.equals("CombineValuesFn") ||
userFnClassName.equals("KeyedCombineFn")) {
+ transformSpec = transformCombineValuesFnToFunctionSpec(userFnSpec);
+ } else {
+ String parDoPTransformId = getString(userFnSpec,
PropertyNames.SERIALIZED_FN);
+
+ RunnerApi.PTransform parDoPTransform =
+ pipeline == null
+ ? null
+ :
pipeline.getComponents().getTransformsOrDefault(parDoPTransformId, null);
+
+ // TODO: only the non-null branch should exist; for migration ease
only
+ if (parDoPTransform != null) {
+ checkArgument(
+ parDoPTransform
+ .getSpec()
+ .getUrn()
+ .equals(PTransformTranslation.PAR_DO_TRANSFORM_URN),
+ "Found transform \"%s\" for ParallelDo instruction, "
+ + " but that transform had unexpected URN \"%s\" (expected
\"%s\")",
+ parDoPTransformId,
+ parDoPTransform.getSpec().getUrn(),
+ PTransformTranslation.PAR_DO_TRANSFORM_URN);
+
+ RunnerApi.ParDoPayload parDoPayload;
+ try {
+ parDoPayload =
+
RunnerApi.ParDoPayload.parseFrom(parDoPTransform.getSpec().getPayload());
+ } catch (InvalidProtocolBufferException exc) {
+ throw new RuntimeException("ParDo did not have a ParDoPayload",
exc);
+ }
+
+ transformSpec
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(parDoPayload.toByteString());
+ } else {
+ // legacy path - bytes are the SdkFunctionSpec's payload field,
basically, and
+ // SDKs expect it in the PTransform's payload field
+ byte[] userFnBytes = getBytes(userFnSpec,
PropertyNames.SERIALIZED_FN);
+ transformSpec
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(ByteString.copyFrom(userFnBytes));
+ }
+ }
+ } else if (parallelInstruction.getRead() != null) {
+ ReadInstruction readInstruction = parallelInstruction.getRead();
+ CloudObject sourceSpec =
+ CloudObject.fromSpec(
+
CloudSourceUtils.flattenBaseSpecs(readInstruction.getSource()).getSpec());
+ // TODO: Need to plumb through the SDK specific function spec.
+ transformSpec.setUrn(JAVA_SOURCE_URN);
+ try {
+ byte[] serializedSource =
+ Base64.getDecoder().decode(getString(sourceSpec,
SERIALIZED_SOURCE));
+ ByteString sourceByteString = ByteString.copyFrom(serializedSource);
+ transformSpec.setPayload(sourceByteString);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Unable to process Read %s", parallelInstruction),
e);
+ }
+ } else if (parallelInstruction.getFlatten() != null) {
+ transformSpec.setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unknown type of ParallelInstruction %s",
parallelInstruction));
+ }
+
+ for (Node predecessorOutput : input.predecessors(node)) {
+ pTransform.putInputs(
+ "generatedInput" + idGenerator.getId(),
nodesToPCollections.get(predecessorOutput));
+ }
+
+ for (Edge edge : input.outEdges(node)) {
+ Node nodeOutput = input.incidentNodes(edge).target();
+ MultiOutputInfoEdge edge2 = (MultiOutputInfoEdge) edge;
+ pTransform.putOutputs(
+ edge2.getMultiOutputInfo().getTag(),
nodesToPCollections.get(nodeOutput));
+ }
+
+ pTransform.setSpec(transformSpec);
+ executableStageTransforms.add(PipelineNode.pTransform(ptransformId,
pTransform.build()));
+ }
+
+ PCollectionNode executableInput = executableStageInputs.iterator().next();
+ RunnerApi.Components executableStageComponents = componentsBuilder.build();
+
+ // Get Environment from ptransform, otherwise, use
JAVA_SDK_HARNESS_ENVIRONMENT as default.
+ Environment executableStageEnv =
+ getEnvironmentFromPTransform(executableStageComponents,
executableStageTransforms);
+ if (executableStageEnv == null) {
+ executableStageEnv = Environments.JAVA_SDK_HARNESS_ENVIRONMENT;
+ }
+
+ Set<SideInputReference> executableStageSideInputs = new HashSet<>();
+ Set<TimerReference> executableStageTimers = new HashSet<>();
+ Set<UserStateReference> executableStageUserStateReference = new
HashSet<>();
+ ExecutableStage executableStage =
+ ImmutableExecutableStage.ofFullComponents(
+ executableStageComponents,
+ executableStageEnv,
+ executableInput,
+ executableStageSideInputs,
+ executableStageUserStateReference,
+ executableStageTimers,
+ executableStageTransforms,
+ executableStageOutputs);
+ return ExecutableStageNode.create(executableStage,
ptransformIdToNameContexts.build());
+ }
+
+ private Environment getEnvironmentFromPTransform(
+ RunnerApi.Components components, Set<PTransformNode> sdkTransforms) {
+ RehydratedComponents sdkComponents =
RehydratedComponents.forComponents(components);
+ Environment env = null;
+ for (PTransformNode pTransformNode : sdkTransforms) {
+ env = Environments.getEnvironment(pTransformNode.getTransform(),
sdkComponents).orElse(null);
+ if (env != null) {
+ break;
+ }
+ }
+
+ return env;
+ }
+
+ /**
+ * Transforms a CombineValuesFn {@link ParDoInstruction} to an Apache Beam
{@link
+ * RunnerApi.FunctionSpec}.
+ */
+ private RunnerApi.FunctionSpec.Builder
transformCombineValuesFnToFunctionSpec(
+ CloudObject userFn) {
+ // Grab the Combine PTransform. This transform is the composite PTransform
representing the
+ // entire CombinePerKey, and it contains the CombinePayload we need.
+ String combinePTransformId = getString(userFn,
PropertyNames.SERIALIZED_FN);
+
+ RunnerApi.PTransform combinePerKeyPTransform =
+ pipeline.getComponents().getTransformsOrDefault(combinePTransformId,
null);
+ checkArgument(
+ combinePerKeyPTransform != null,
+ "Transform with id \"%s\" not found in pipeline.",
+ combinePTransformId);
+
+ checkArgument(
+ combinePerKeyPTransform.getSpec().getUrn().equals(COMBINE_PER_KEY_URN),
+ "Found transform \"%s\" for Combine instruction, "
+ + "but that transform had unexpected URN \"%s\" (expected \"%s\")",
+ combinePerKeyPTransform,
+ combinePerKeyPTransform.getSpec().getUrn(),
+ COMBINE_PER_KEY_URN);
+
+ RunnerApi.CombinePayload combinePayload;
+ try {
+ combinePayload =
+
RunnerApi.CombinePayload.parseFrom(combinePerKeyPTransform.getSpec().getPayload());
+ } catch (InvalidProtocolBufferException exc) {
+ throw new RuntimeException("Combine did not have a CombinePayload", exc);
+ }
+
+ String phase = getString(userFn, WorkerPropertyNames.PHASE,
CombinePhase.ALL);
+ String urn;
+
+ switch (phase) {
+ case CombinePhase.ALL:
+ urn = COMBINE_GROUPED_VALUES_URN;
+ break;
+ case CombinePhase.ADD:
+ urn = COMBINE_PRECOMBINE_URN;
+ break;
+ case CombinePhase.MERGE:
+ urn = COMBINE_MERGE_URN;
+ break;
+ case CombinePhase.EXTRACT:
+ urn = COMBINE_EXTRACT_URN;
+ break;
+ default:
+ throw new RuntimeException("Encountered unknown Combine Phase: " +
phase);
+ }
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(urn)
+ .setPayload(combinePayload.toByteString());
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
index 2a71acc79803..5a1dffbdc6a9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
@@ -74,10 +74,10 @@
*/
public class CreateRegisterFnOperationFunction
implements Function<MutableNetwork<Node, Edge>, MutableNetwork<Node,
Edge>> {
-
private final IdGenerator idGenerator;
private final BiFunction<String, String, Node> portSupplier;
private final Function<MutableNetwork<Node, Edge>, Node>
registerFnOperationFunction;
+ private final boolean useExecutableStageBundleExecution;
/**
* Constructs a function which is able to break up the instruction graph
into SDK and Runner
@@ -93,10 +93,12 @@
public CreateRegisterFnOperationFunction(
IdGenerator idGenerator,
BiFunction<String, String, Node> portSupplier,
- Function<MutableNetwork<Node, Edge>, Node> registerFnOperationFunction) {
+ Function<MutableNetwork<Node, Edge>, Node> registerFnOperationFunction,
+ boolean useExecutableStageBundleExecution) {
this.idGenerator = idGenerator;
this.portSupplier = portSupplier;
this.registerFnOperationFunction = registerFnOperationFunction;
+ this.useExecutableStageBundleExecution = useExecutableStageBundleExecution;
}
@Override
@@ -187,6 +189,11 @@ public CreateRegisterFnOperationFunction(
Set<Node> allRunnerNodes =
Networks.reachableNodes(
network, Sets.union(runnerRootNodes, sdkToRunnerBoundaries),
runnerToSdkBoundaries);
+ if (this.useExecutableStageBundleExecution) {
+ // When using shared library, there is no grpc node in runner graph.
+ allRunnerNodes =
+ Sets.difference(allRunnerNodes, Sets.union(runnerToSdkBoundaries,
sdkToRunnerBoundaries));
+ }
MutableNetwork<Node, Edge> runnerNetwork = Graphs.inducedSubgraph(network,
allRunnerNodes);
// TODO: Reduce the amount of 'copying' of SDK nodes by breaking potential
cycles
@@ -204,11 +211,24 @@ public CreateRegisterFnOperationFunction(
// Create happens before relationships between all Runner and SDK nodes
which are in the
// SDK subnetwork; direction dependent on whether its a predecessor of
the SDK subnetwork or
// a successor.
- for (Node predecessor : Sets.intersection(sdkSubnetworkNodes,
runnerToSdkBoundaries)) {
- runnerNetwork.addEdge(predecessor, registerFnNode,
HappensBeforeEdge.create());
- }
- for (Node successor : Sets.intersection(sdkSubnetworkNodes,
sdkToRunnerBoundaries)) {
- runnerNetwork.addEdge(registerFnNode, successor,
HappensBeforeEdge.create());
+ if (this.useExecutableStageBundleExecution) {
+ // When using shared library, there is no gprc node in runner graph.
Then the registerFnNode
+ // should be linked directly to 2 OutputInstruction nodes.
+ for (Node predecessor : Sets.intersection(sdkSubnetworkNodes,
runnerToSdkBoundaries)) {
+ predecessor = network.predecessors(predecessor).iterator().next();
+ runnerNetwork.addEdge(predecessor, registerFnNode,
HappensBeforeEdge.create());
+ }
+ for (Node successor : Sets.intersection(sdkSubnetworkNodes,
sdkToRunnerBoundaries)) {
+ successor = network.successors(successor).iterator().next();
+ runnerNetwork.addEdge(registerFnNode, successor,
HappensBeforeEdge.create());
+ }
+ } else {
+ for (Node predecessor : Sets.intersection(sdkSubnetworkNodes,
runnerToSdkBoundaries)) {
+ runnerNetwork.addEdge(predecessor, registerFnNode,
HappensBeforeEdge.create());
+ }
+ for (Node successor : Sets.intersection(sdkSubnetworkNodes,
sdkToRunnerBoundaries)) {
+ runnerNetwork.addEdge(registerFnNode, successor,
HappensBeforeEdge.create());
+ }
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 406fa526c107..5d058a97628d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -35,6 +35,7 @@
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
@@ -304,6 +305,29 @@ public String toString() {
}
}
+ /** A node that stores {@link
org.apache.beam.runners.core.construction.graph.ExecutableStage}. */
+ @AutoValue
+ public abstract static class ExecutableStageNode extends Node {
+ public static ExecutableStageNode create(
+ ExecutableStage executableStage,
+ Map<String, NameContext> ptransformIdToPartialNameContextMap) {
+ checkNotNull(executableStage);
+ checkNotNull(ptransformIdToPartialNameContextMap);
+ return new AutoValue_Nodes_ExecutableStageNode(
+ executableStage, ptransformIdToPartialNameContextMap);
+ }
+
+ public abstract ExecutableStage getExecutableStage();
+
+ public abstract Map<String, NameContext>
getPTransformIdToPartialNameContextMap();
+
+ @Override
+ public String toString() {
+ // The request may be very large.
+ return "ExecutableStageNode";
+ }
+ }
+
/**
* A node in the graph responsible for fetching side inputs that are ready
and also filtering
* elements which are blocked after asking the SDK harness to perform any
window mapping.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index be947c48276b..6b12b219bd0e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -174,8 +174,8 @@ public void testCreateMapTaskExecutor() throws Exception {
mapTaskExecutorFactory.create(
null /* beamFnControlClientHandler */,
null /* beamFnDataService */,
- null, /* dataApiServiceDescriptor */
null /* beamFnStateService */,
+ null,
mapTaskToNetwork.apply(mapTask),
options,
STAGE,
@@ -265,8 +265,8 @@ public void testExecutionContextPlumbing() throws Exception
{
mapTaskExecutorFactory.create(
null /* beamFnControlClientHandler */,
null /* beamFnDataService */,
- null, /* dataApiServiceDescriptor */
null /* beamFnStateService */,
+ null,
mapTaskToNetwork.apply(mapTask),
options,
STAGE,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
index cb8d55a7439d..87d4035b9ab2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
@@ -70,7 +70,7 @@ public void setUp() {
MockitoAnnotations.initMocks(this);
createRegisterFnOperation =
new CreateRegisterFnOperationFunction(
- IdGenerators.decrementingLongs(), portSupplier,
registerFnOperationFunction);
+ IdGenerators.decrementingLongs(), portSupplier,
registerFnOperationFunction, false);
}
@Test
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index cf9b8bdbd536..23b752bdf2c1 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -52,8 +52,10 @@
public static JobBundleFactory create(
EnvironmentFactory environmentFactory,
GrpcFnServer<GrpcDataService> data,
- GrpcFnServer<GrpcStateService> state) {
- return new SingleEnvironmentInstanceJobBundleFactory(environmentFactory,
data, state);
+ GrpcFnServer<GrpcStateService> state,
+ IdGenerator idGenerator) {
+ return new SingleEnvironmentInstanceJobBundleFactory(
+ environmentFactory, data, state, idGenerator);
}
private final EnvironmentFactory environmentFactory;
@@ -66,15 +68,21 @@ public static JobBundleFactory create(
private final ConcurrentMap<Environment, RemoteEnvironment> environments =
new ConcurrentHashMap<>();
- private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+ private final IdGenerator idGenerator;
private SingleEnvironmentInstanceJobBundleFactory(
EnvironmentFactory environmentFactory,
GrpcFnServer<GrpcDataService> dataService,
- GrpcFnServer<GrpcStateService> stateService) {
+ GrpcFnServer<GrpcStateService> stateService,
+ IdGenerator idGenerator) {
this.environmentFactory = environmentFactory;
this.dataService = dataService;
this.stateService = stateService;
+ if (idGenerator != null) {
+ this.idGenerator = idGenerator;
+ } else {
+ this.idGenerator = IdGenerators.incrementingLongs();
+ }
}
@Override
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
index ae4410695f28..9038755a0455 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
@@ -79,6 +79,15 @@ private GrpcDataService(
this.outboundObserverFactory = outboundObserverFactory;
}
+ /** @deprecated This constructor is for migrating Dataflow purpose only. */
+ @Deprecated
+ public GrpcDataService() {
+ this.connectedClient = null;
+ this.additionalMultiplexers = null;
+ this.executor = null;
+ this.outboundObserverFactory = null;
+ }
+
@Override
public StreamObserver<BeamFnApi.Elements> data(
final StreamObserver<BeamFnApi.Elements> outboundElementObserver) {
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
new file mode 100644
index 000000000000..655a2e716fc4
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+
+/** A {@link RemoteEnvironment} that connects to Dataflow runner harness. */
+@ThreadSafe
+public class StaticRemoteEnvironment implements RemoteEnvironment {
+
+ static StaticRemoteEnvironment create(
+ Environment environment, InstructionRequestHandler
instructionRequestHandler) {
+ return new StaticRemoteEnvironment(environment, instructionRequestHandler);
+ }
+
+ private final Object lock = new Object();
+ private final Environment environment;
+ private final InstructionRequestHandler instructionRequestHandler;
+
+ private boolean isClosed = false;
+
+ private StaticRemoteEnvironment(
+ Environment environment, InstructionRequestHandler
instructionRequestHandler) {
+ this.environment = environment;
+ this.instructionRequestHandler = instructionRequestHandler;
+ }
+
+ @Override
+ public Environment getEnvironment() {
+ return this.environment;
+ }
+
+ @Override
+ public InstructionRequestHandler getInstructionRequestHandler() {
+ return this.instructionRequestHandler;
+ }
+
+ @Override
+ public void close() throws Exception {
+ synchronized (lock) {
+ // The running docker container and instruction handler should each only
be terminated once.
+ // Do nothing if we have already requested termination.
+ if (!isClosed) {
+ isClosed = true;
+ this.instructionRequestHandler.close();
+ }
+ }
+ }
+}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
new file mode 100644
index 000000000000..5b3546204892
--- /dev/null
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+
+/**
+ * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by
Dataflow runner
+ * harness.
+ */
+public class StaticRemoteEnvironmentFactory implements EnvironmentFactory {
+ public static StaticRemoteEnvironmentFactory forService(
+ InstructionRequestHandler instructionRequestHandler) {
+ StaticRemoteEnvironmentFactory factory = new
StaticRemoteEnvironmentFactory();
+ factory.setStaticServiceContent(instructionRequestHandler);
+ return factory;
+ }
+
+ private InstructionRequestHandler instructionRequestHandler;
+
+ private void setStaticServiceContent(InstructionRequestHandler
instructionRequestHandler) {
+ this.instructionRequestHandler = instructionRequestHandler;
+ }
+
+ @Override
+ public RemoteEnvironment createEnvironment(Environment environment) {
+ return StaticRemoteEnvironment.create(environment,
this.instructionRequestHandler);
+ }
+
+ /** Provider for StaticRemoteEnvironmentFactory. */
+ public static class Provider implements EnvironmentFactory.Provider {
+ private final InstructionRequestHandler instructionRequestHandler;
+
+ public Provider(InstructionRequestHandler instructionRequestHandler) {
+ this.instructionRequestHandler = instructionRequestHandler;
+ }
+
+ @Override
+ public EnvironmentFactory createEnvironmentFactory(
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ ControlClientPool clientPool,
+ IdGenerator idGenerator) {
+ return
StaticRemoteEnvironmentFactory.forService(this.instructionRequestHandler);
+ }
+ }
+}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 460c5b66258e..30e1bfef2d18 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -84,7 +84,7 @@ public void setup() throws Exception {
factory =
SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory, dataServer, stateServer);
+ environmentFactory, dataServer, stateServer, null);
}
@After
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 172754)
Time Spent: 1.5h (was: 1h 20m)
> Dataflow portable runner harness should use ExecutableStage to process bundle
> -----------------------------------------------------------------------------
>
> Key: BEAM-6159
> URL: https://issues.apache.org/jira/browse/BEAM-6159
> Project: Beam
> Issue Type: Task
> Components: runner-dataflow
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)