[
https://issues.apache.org/jira/browse/BEAM-6194?focusedWorklogId=173044&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173044
]
ASF GitHub Bot logged work on BEAM-6194:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/18 19:23
Start Date: 07/Dec/18 19:23
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #7219: [BEAM-6194] Follow
up with cleanup for PR7015
URL: https://github.com/apache/beam/pull/7219
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 1d1c56cb80f3..e5323552d61c 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -82,6 +82,7 @@
import
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
+import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -97,6 +98,8 @@
private final EnvironmentType environmentType;
+ private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
private ReferenceRunner(
Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType
environmentType) {
this.pipeline = executable(p);
@@ -197,7 +200,8 @@ public void execute() throws Exception {
EnvironmentFactory environmentFactory =
createEnvironmentFactory(control, logging, artifact, provisioning,
controlClientPool);
JobBundleFactory jobBundleFactory =
- SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory,
data, state, null);
+ SingleEnvironmentInstanceJobBundleFactory.create(
+ environmentFactory, data, state, idGenerator);
TransformEvaluatorRegistry transformRegistry =
TransformEvaluatorRegistry.portableRegistry(
@@ -238,12 +242,7 @@ private EnvironmentFactory createEnvironmentFactory(
case DOCKER:
return new
DockerEnvironmentFactory.Provider(PipelineOptionsTranslation.fromProto(options))
.createEnvironmentFactory(
- control,
- logging,
- artifact,
- provisioning,
- controlClient,
- IdGenerators.incrementingLongs());
+ control, logging, artifact, provisioning, controlClient,
idGenerator);
case IN_PROCESS:
return EmbeddedEnvironmentFactory.create(
PipelineOptionsFactory.create(), logging, control,
controlClient.getSource());
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index 2dfaac966827..d90d66d220c9 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -50,6 +50,7 @@
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
@@ -111,7 +112,7 @@ public void setup() throws Exception {
bundleFactory = ImmutableListBundleFactory.create();
JobBundleFactory jobBundleFactory =
SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory, dataServer, stateServer, null);
+ environmentFactory, dataServer, stateServer,
IdGenerators.incrementingLongs());
factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7db53391fc57..78bba56cb8cd 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -148,26 +148,6 @@ public DataflowMapTaskExecutor create(
Networks.replaceDirectedNetworkNodes(
network, createOutputReceiversTransform(stageName, counterSet));
- // Swap out all the RegisterFnRequest nodes with Operation nodes
- Networks.replaceDirectedNetworkNodes(
- network,
- createOperationTransformForRegisterFnNodes(
- idGenerator,
- instructionRequestHandler,
- grpcStateFnServer.getService(),
- stageName,
- executionContext));
-
- // Swap out all the RemoteGrpcPort nodes with Operation nodes, note that
it is expected
- // that the RegisterFnRequest nodes have already been replaced.
- Networks.replaceDirectedNetworkNodes(
- network,
- createOperationTransformForGrpcPortNodes(
- network,
- grpcDataFnServer.getService(),
- // TODO: Set NameContext properly for these operations.
- executionContext.createOperationContext(
- NameContext.create(stageName, stageName, stageName,
stageName))));
if (DataflowRunner.hasExperiment(
options.as(DataflowPipelineDebugOptions.class),
"use_executable_stage_bundle_execution")) {
LOG.debug("Using SingleEnvironmentInstanceJobBundleFactory");
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index 3d44331dd5e5..ae8a13ebd541 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -35,9 +35,8 @@
/**
* This {@link
org.apache.beam.runners.dataflow.worker.util.common.worker.Operation} is
responsible
* for communicating with the SDK harness and asking it to process a bundle of
work. This operation
- * request a RemoteBundle{@link
org.apache.beam.runners.fnexecution.control.RemoteBundle}, send data
- * elements to SDK and receive processed results from SDK, then pass these
elements to next
- * Operations.
+ * requests a {@link
org.apache.beam.runners.fnexecution.control.RemoteBundle}, sends elements to
+ * SDK and receive processed results from SDK, passing these elements
downstream.
*/
public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
private static final Logger LOG =
LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ab8cd0b8596a..c93e9e909cb8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -82,8 +82,9 @@
import
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.InvalidProtocolBufferException;
/**
- * Converts a {@link Network} representation of {@link MapTask} destined for
the SDK harness into an
- * {@link Node} containing {@link
org.apache.beam.runners.core.construction.graph.ExecutableStage}.
+ * Converts a {@link Network} representation of {@link MapTask} destined for
the SDK harness into a
+ * {@link Node} containing an {@link
+ * org.apache.beam.runners.core.construction.graph.ExecutableStage}.
*/
public class CreateExecutableStageNodeFunction
implements Function<MutableNetwork<Node, Edge>, Node> {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 6b12b219bd0e..182ad50b8579 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -173,9 +173,9 @@ public void testCreateMapTaskExecutor() throws Exception {
try (DataflowMapTaskExecutor executor =
mapTaskExecutorFactory.create(
null /* beamFnControlClientHandler */,
- null /* beamFnDataService */,
- null /* beamFnStateService */,
- null,
+ null /* GrpcFnServer<GrpcDataService> */,
+ null /* ApiServiceDescriptor */,
+ null, /* GrpcFnServer<GrpcStateService> */
mapTaskToNetwork.apply(mapTask),
options,
STAGE,
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 23b752bdf2c1..b7d0c58446b9 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -35,7 +35,6 @@
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
@@ -78,11 +77,7 @@ private SingleEnvironmentInstanceJobBundleFactory(
this.environmentFactory = environmentFactory;
this.dataService = dataService;
this.stateService = stateService;
- if (idGenerator != null) {
- this.idGenerator = idGenerator;
- } else {
- this.idGenerator = IdGenerators.incrementingLongs();
- }
+ this.idGenerator = idGenerator;
}
@Override
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
index 655a2e716fc4..79c68ab87071 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
@@ -30,7 +30,6 @@ static StaticRemoteEnvironment create(
return new StaticRemoteEnvironment(environment, instructionRequestHandler);
}
- private final Object lock = new Object();
private final Environment environment;
private final InstructionRequestHandler instructionRequestHandler;
@@ -53,14 +52,13 @@ public InstructionRequestHandler
getInstructionRequestHandler() {
}
@Override
- public void close() throws Exception {
- synchronized (lock) {
- // The running docker container and instruction handler should each only
be terminated once.
- // Do nothing if we have already requested termination.
- if (!isClosed) {
- isClosed = true;
- this.instructionRequestHandler.close();
- }
+ public synchronized void close() throws Exception {
+ // The instruction handler should each only be terminated once.
+ // Do nothing if we have already requested termination.
+ if (!isClosed) {
+ return;
}
+ isClosed = true;
+ this.instructionRequestHandler.close();
}
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
index 5b3546204892..29a0721ca45a 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
@@ -28,20 +28,18 @@
import org.apache.beam.sdk.fn.IdGenerator;
/**
- * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by
Dataflow runner
- * harness.
+ * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by
a runner harness that
+ * would like to use an existing InstructionRequestHandler.
*/
public class StaticRemoteEnvironmentFactory implements EnvironmentFactory {
public static StaticRemoteEnvironmentFactory forService(
InstructionRequestHandler instructionRequestHandler) {
- StaticRemoteEnvironmentFactory factory = new
StaticRemoteEnvironmentFactory();
- factory.setStaticServiceContent(instructionRequestHandler);
- return factory;
+ return new StaticRemoteEnvironmentFactory(instructionRequestHandler);
}
- private InstructionRequestHandler instructionRequestHandler;
+ private final InstructionRequestHandler instructionRequestHandler;
- private void setStaticServiceContent(InstructionRequestHandler
instructionRequestHandler) {
+ private StaticRemoteEnvironmentFactory(InstructionRequestHandler
instructionRequestHandler) {
this.instructionRequestHandler = instructionRequestHandler;
}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 30e1bfef2d18..a795e3c33340 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -47,6 +47,7 @@
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.transforms.Create;
import org.junit.After;
@@ -84,7 +85,7 @@ public void setup() throws Exception {
factory =
SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory, dataServer, stateServer, null);
+ environmentFactory, dataServer, stateServer,
IdGenerators.incrementingLongs());
}
@After
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 173044)
Time Spent: 10m
Remaining Estimate: 0h
> Address comments in PR7015
> --------------------------
>
> Key: BEAM-6194
> URL: https://issues.apache.org/jira/browse/BEAM-6194
> Project: Beam
> Issue Type: Sub-task
> Components: runner-dataflow
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)