[ 
https://issues.apache.org/jira/browse/BEAM-6194?focusedWorklogId=173044&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173044
 ]

ASF GitHub Bot logged work on BEAM-6194:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Dec/18 19:23
            Start Date: 07/Dec/18 19:23
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7219: [BEAM-6194] Follow 
up with cleanup for PR7015
URL: https://github.com/apache/beam/pull/7219
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 1d1c56cb80f3..e5323552d61c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -82,6 +82,7 @@
 import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -97,6 +98,8 @@
 
   private final EnvironmentType environmentType;
 
+  private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
   private ReferenceRunner(
       Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType 
environmentType) {
     this.pipeline = executable(p);
@@ -197,7 +200,8 @@ public void execute() throws Exception {
       EnvironmentFactory environmentFactory =
           createEnvironmentFactory(control, logging, artifact, provisioning, 
controlClientPool);
       JobBundleFactory jobBundleFactory =
-          SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory, 
data, state, null);
+          SingleEnvironmentInstanceJobBundleFactory.create(
+              environmentFactory, data, state, idGenerator);
 
       TransformEvaluatorRegistry transformRegistry =
           TransformEvaluatorRegistry.portableRegistry(
@@ -238,12 +242,7 @@ private EnvironmentFactory createEnvironmentFactory(
       case DOCKER:
         return new 
DockerEnvironmentFactory.Provider(PipelineOptionsTranslation.fromProto(options))
             .createEnvironmentFactory(
-                control,
-                logging,
-                artifact,
-                provisioning,
-                controlClient,
-                IdGenerators.incrementingLongs());
+                control, logging, artifact, provisioning, controlClient, 
idGenerator);
       case IN_PROCESS:
         return EmbeddedEnvironmentFactory.create(
             PipelineOptionsFactory.create(), logging, control, 
controlClient.getSource());
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index 2dfaac966827..d90d66d220c9 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -50,6 +50,7 @@
 import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -111,7 +112,7 @@ public void setup() throws Exception {
     bundleFactory = ImmutableListBundleFactory.create();
     JobBundleFactory jobBundleFactory =
         SingleEnvironmentInstanceJobBundleFactory.create(
-            environmentFactory, dataServer, stateServer, null);
+            environmentFactory, dataServer, stateServer, 
IdGenerators.incrementingLongs());
     factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7db53391fc57..78bba56cb8cd 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -148,26 +148,6 @@ public DataflowMapTaskExecutor create(
     Networks.replaceDirectedNetworkNodes(
         network, createOutputReceiversTransform(stageName, counterSet));
 
-    // Swap out all the RegisterFnRequest nodes with Operation nodes
-    Networks.replaceDirectedNetworkNodes(
-        network,
-        createOperationTransformForRegisterFnNodes(
-            idGenerator,
-            instructionRequestHandler,
-            grpcStateFnServer.getService(),
-            stageName,
-            executionContext));
-
-    // Swap out all the RemoteGrpcPort nodes with Operation nodes, note that 
it is expected
-    // that the RegisterFnRequest nodes have already been replaced.
-    Networks.replaceDirectedNetworkNodes(
-        network,
-        createOperationTransformForGrpcPortNodes(
-            network,
-            grpcDataFnServer.getService(),
-            // TODO: Set NameContext properly for these operations.
-            executionContext.createOperationContext(
-                NameContext.create(stageName, stageName, stageName, 
stageName))));
     if (DataflowRunner.hasExperiment(
         options.as(DataflowPipelineDebugOptions.class), 
"use_executable_stage_bundle_execution")) {
       LOG.debug("Using SingleEnvironmentInstanceJobBundleFactory");
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index 3d44331dd5e5..ae8a13ebd541 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -35,9 +35,8 @@
 /**
  * This {@link 
org.apache.beam.runners.dataflow.worker.util.common.worker.Operation} is 
responsible
  * for communicating with the SDK harness and asking it to process a bundle of 
work. This operation
- * request a RemoteBundle{@link 
org.apache.beam.runners.fnexecution.control.RemoteBundle}, send data
- * elements to SDK and receive processed results from SDK, then pass these 
elements to next
- * Operations.
+ * requests a {@link 
org.apache.beam.runners.fnexecution.control.RemoteBundle}, sends elements to
+ * SDK and receive processed results from SDK, passing these elements 
downstream.
  */
 public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ab8cd0b8596a..c93e9e909cb8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -82,8 +82,9 @@
 import 
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Converts a {@link Network} representation of {@link MapTask} destined for 
the SDK harness into an
- * {@link Node} containing {@link 
org.apache.beam.runners.core.construction.graph.ExecutableStage}.
+ * Converts a {@link Network} representation of {@link MapTask} destined for 
the SDK harness into a
+ * {@link Node} containing an {@link
+ * org.apache.beam.runners.core.construction.graph.ExecutableStage}.
  */
 public class CreateExecutableStageNodeFunction
     implements Function<MutableNetwork<Node, Edge>, Node> {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 6b12b219bd0e..182ad50b8579 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -173,9 +173,9 @@ public void testCreateMapTaskExecutor() throws Exception {
     try (DataflowMapTaskExecutor executor =
         mapTaskExecutorFactory.create(
             null /* beamFnControlClientHandler */,
-            null /* beamFnDataService */,
-            null /* beamFnStateService */,
-            null,
+            null /* GrpcFnServer<GrpcDataService> */,
+            null /* ApiServiceDescriptor */,
+            null, /* GrpcFnServer<GrpcStateService> */
             mapTaskToNetwork.apply(mapTask),
             options,
             STAGE,
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 23b752bdf2c1..b7d0c58446b9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -35,7 +35,6 @@
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -78,11 +77,7 @@ private SingleEnvironmentInstanceJobBundleFactory(
     this.environmentFactory = environmentFactory;
     this.dataService = dataService;
     this.stateService = stateService;
-    if (idGenerator != null) {
-      this.idGenerator = idGenerator;
-    } else {
-      this.idGenerator = IdGenerators.incrementingLongs();
-    }
+    this.idGenerator = idGenerator;
   }
 
   @Override
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
index 655a2e716fc4..79c68ab87071 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
@@ -30,7 +30,6 @@ static StaticRemoteEnvironment create(
     return new StaticRemoteEnvironment(environment, instructionRequestHandler);
   }
 
-  private final Object lock = new Object();
   private final Environment environment;
   private final InstructionRequestHandler instructionRequestHandler;
 
@@ -53,14 +52,13 @@ public InstructionRequestHandler 
getInstructionRequestHandler() {
   }
 
   @Override
-  public void close() throws Exception {
-    synchronized (lock) {
-      // The running docker container and instruction handler should each only 
be terminated once.
-      // Do nothing if we have already requested termination.
-      if (!isClosed) {
-        isClosed = true;
-        this.instructionRequestHandler.close();
-      }
+  public synchronized void close() throws Exception {
+    // The instruction handler should each only be terminated once.
+    // Do nothing if we have already requested termination.
+    if (!isClosed) {
+      return;
     }
+    isClosed = true;
+    this.instructionRequestHandler.close();
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
index 5b3546204892..29a0721ca45a 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
@@ -28,20 +28,18 @@
 import org.apache.beam.sdk.fn.IdGenerator;
 
 /**
- * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by 
Dataflow runner
- * harness.
+ * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by 
a runner harness that
+ * would like to use an existing InstructionRequestHandler.
  */
 public class StaticRemoteEnvironmentFactory implements EnvironmentFactory {
   public static StaticRemoteEnvironmentFactory forService(
       InstructionRequestHandler instructionRequestHandler) {
-    StaticRemoteEnvironmentFactory factory = new 
StaticRemoteEnvironmentFactory();
-    factory.setStaticServiceContent(instructionRequestHandler);
-    return factory;
+    return new StaticRemoteEnvironmentFactory(instructionRequestHandler);
   }
 
-  private InstructionRequestHandler instructionRequestHandler;
+  private final InstructionRequestHandler instructionRequestHandler;
 
-  private void setStaticServiceContent(InstructionRequestHandler 
instructionRequestHandler) {
+  private StaticRemoteEnvironmentFactory(InstructionRequestHandler 
instructionRequestHandler) {
     this.instructionRequestHandler = instructionRequestHandler;
   }
 
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 30e1bfef2d18..a795e3c33340 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -47,6 +47,7 @@
 import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.junit.After;
@@ -84,7 +85,7 @@ public void setup() throws Exception {
 
     factory =
         SingleEnvironmentInstanceJobBundleFactory.create(
-            environmentFactory, dataServer, stateServer, null);
+            environmentFactory, dataServer, stateServer, 
IdGenerators.incrementingLongs());
   }
 
   @After


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 173044)
            Time Spent: 10m
    Remaining Estimate: 0h

> Address comments in PR7015
> --------------------------
>
>                 Key: BEAM-6194
>                 URL: https://issues.apache.org/jira/browse/BEAM-6194
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-dataflow
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to