[ 
https://issues.apache.org/jira/browse/BEAM-5653?focusedWorklogId=154423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154423
 ]

ASF GitHub Bot logged work on BEAM-5653:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Oct/18 19:45
            Start Date: 15/Oct/18 19:45
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #6649: [BEAM-5653] Fix 
overriding coders due to duplicate coderId generation
URL: https://github.com/apache/beam/pull/6649
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index 128034ad88a..92592100245 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -41,7 +41,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -190,27 +189,17 @@ public Node apply(MutableNetwork<Node, Edge> input) {
             .setId(idGenerator.get())
             .setStateApiServiceDescriptor(stateApiServiceDescriptor);
 
-    // Seed the ProcessBundleDescriptor with the bits from the pipeline.
-    // We do _not_ seed the transforms, as a ProcessBundleDescriptor will 
execute all transforms
-    // so misc. client-side transforms are meaningless.
-    processBundleDescriptor
-        .putAllCoders(pipeline.getComponents().getCodersMap())
-        .putAllPcollections(pipeline.getComponents().getPcollectionsMap())
-        
.putAllWindowingStrategies(pipeline.getComponents().getWindowingStrategiesMap());
-
     // For intermediate PCollections we fabricate, we make a bogus 
WindowingStrategy
     // TODO: create a correct windowing strategy, including coders and 
environment
     // An SdkFunctionSpec is invalid without a working environment reference. 
We can revamp that
     // when we inline SdkFunctionSpec and FunctionSpec, both slated for 
inlining wherever they occur
-    SdkComponents sdkComponents = SdkComponents.create();
-    // Attempt to use the environment supplied by the pipeline, otherwise 
default to use
-    // the Java environment.
-    try {
-      sdkComponents.registerEnvironment(
-          
Iterables.getOnlyElement(pipeline.getComponents().getEnvironmentsMap().values()));
-    } catch (NoSuchElementException e) {
+    SdkComponents sdkComponents = 
SdkComponents.create(pipeline.getComponents());
+
+    // Default to use the Java environment if pipeline doesn't have 
environment specified.
+    if (pipeline.getComponents().getEnvironmentsMap().isEmpty()) {
       
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
     }
+
     String fakeWindowingStrategyId = "fakeWindowingStrategy" + 
idGenerator.get();
     try {
       RunnerApi.MessageWithComponents fakeWindowingStrategyProto =
@@ -236,7 +225,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
         Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
       InstructionOutput instructionOutput = node.getInstructionOutput();
 
-      String coderId = idGenerator.get();
+      String coderId = "generatedCoder" + idGenerator.get();
       try (ByteString.Output output = ByteString.newOutput()) {
         try {
           Coder<?> javaCoder =
@@ -271,7 +260,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
             e);
       }
 
-      String pcollectionId = idGenerator.get();
+      String pcollectionId = "generatedPcollection" + idGenerator.get();
       processBundleDescriptor.putPcollections(
           pcollectionId,
           RunnerApi.PCollection.newBuilder()
@@ -285,7 +274,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
     for (ParallelInstructionNode node :
         Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
       ParallelInstruction parallelInstruction = node.getParallelInstruction();
-      String ptransformId = idGenerator.get();
+      String ptransformId = "generatedPtransform" + idGenerator.get();
       ptransformIdToNameContexts.put(
           ptransformId,
           NameContext.create(
@@ -394,7 +383,8 @@ public Node apply(MutableNetwork<Node, Edge> input) {
       }
 
       for (Node predecessorOutput : input.predecessors(node)) {
-        pTransform.putInputs(idGenerator.get(), 
nodesToPCollections.get(predecessorOutput));
+        pTransform.putInputs(
+            "generatedInput" + idGenerator.get(), 
nodesToPCollections.get(predecessorOutput));
       }
 
       for (Edge edge : input.outEdges(node)) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 154423)
            Time Spent: 4.5h  (was: 4h 20m)
    Remaining Estimate: 67.5h  (was: 67h 40m)

> Dataflow FnApi Worker overrides some of Coders due to coder ID generation 
> collision.
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-5653
>                 URL: https://issues.apache.org/jira/browse/BEAM-5653
>             Project: Beam
>          Issue Type: Test
>          Components: java-fn-execution
>            Reporter: Mikhail Gryzykhin
>            Assignee: Mikhail Gryzykhin
>            Priority: Blocker
>             Fix For: 2.8.0
>
>   Original Estimate: 72h
>          Time Spent: 4.5h
>  Remaining Estimate: 67.5h
>
> Due to one of latest refactorings, we got a bug in Java FnApi Worker that it 
> overrides Coders in ProcessBundleDescriptor sent to SDK Harness that causes 
> jobs to fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to