[ 
https://issues.apache.org/jira/browse/BEAM-12222?focusedWorklogId=589419&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-589419
 ]

ASF GitHub Bot logged work on BEAM-12222:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/21 19:45
            Start Date: 26/Apr/21 19:45
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on a change in pull request 
#14641:
URL: https://github.com/apache/beam/pull/14641#discussion_r620594252



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -371,14 +371,27 @@ public void replaceChild(Node existing, Node replacement) 
{
      * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
      */
     public boolean isCompositeNode() {
-      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+      if (isRootNode()) {
+        LOG.info("Composite because root node {}", this);

Review comment:
       No this seems to be a heuristic. It caused real problems for me because 
I didn't realize it. Especially the "returns others output" clause.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -371,14 +371,27 @@ public void replaceChild(Node existing, Node replacement) 
{
      * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
      */
     public boolean isCompositeNode() {
-      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+      if (isRootNode()) {
+        LOG.info("Composite because root node {}", this);
+        return true;
+      }
+      if (!parts.isEmpty()) {
+        LOG.info("Composite because has subtransforms {}", this);
+        return true;
+      }
+      if (returnsOthersOutput()) {
+        LOG.info("Composite because returns others output {}", this);

Review comment:
       Yea, it can. This is why `CreateDataflowView.forBatch` was considered a 
composite. Because it materializes its input and then returns its input. So it 
returns another transform's output and the traversal thinks it is a composite. 
That is why I had to add the extra `materializatoinInput = 
input.apply(<identity>)` bit.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -350,29 +330,18 @@ private AsIterable() {}
         throw new IllegalStateException("Unable to create a side-input view 
from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), 
"use_unified_worker"))) {
-        Coder<T> inputCoder = input.getCoder();
-        PCollectionView<Iterable<T>> view =
-            PCollectionViews.iterableView(
-                input,
-                (TypeDescriptorSupplier<T>) 
inputCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
-      PCollection<KV<Void, T>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
+      // HACK: there are bugs in "composite" vs "primitive" transform 
distinction

Review comment:
       Filed https://issues.apache.org/jira/browse/BEAM-12228

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -508,35 +477,21 @@ private AsMultimap() {}
         throw new IllegalStateException("Unable to create a side-input view 
from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), 
"use_unified_worker"))) {
-        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
-        Coder<K> keyCoder = kvCoder.getKeyCoder();
-        Coder<V> valueCoder = kvCoder.getValueCoder();
-        PCollectionView<Map<K, Iterable<V>>> view =
-            PCollectionViews.multimapView(
-                input,
-                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
-                (TypeDescriptorSupplier<V>) 
valueCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
-      PCollection<KV<Void, KV<K, V>>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
+      // HACK: there are bugs in "composite" vs "primitive" transform 
distinction

Review comment:
       https://issues.apache.org/jira/browse/BEAM-12228




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 589419)
    Time Spent: 2h 10m  (was: 2h)

> Dataflow side input translation "Unknown producer for value"
> ------------------------------------------------------------
>
>                 Key: BEAM-12222
>                 URL: https://issues.apache.org/jira/browse/BEAM-12222
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: P1
>             Fix For: 2.30.0
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> I have identified a seemingly nondeterministic issue in Dataflow translation, 
> where pipelines with side inputs sometimes are translated in the wrong order.
> {code}
> java.lang.NullPointerException: Unknown producer for value 
> SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:1221#4dca087078898728>}
>  while translating step 
> TfIdf.ComputeTfIdf/Combine.globally(Count)/ProduceDefault
>       at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1227)
> {code}
> Seen on 
> https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2_PR/32/testReport/junit/org.apache.beam.examples.complete/TfIdfIT/testE2ETfIdf/
>  and also other changes. I think the change itself is just triggering the 
> nondeterministic problem.
> So there is a lurking problem with side inputs overall.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to