[ 
https://issues.apache.org/jira/browse/BEAM-12222?focusedWorklogId=589401&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-589401
 ]

ASF GitHub Bot logged work on BEAM-12222:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/21 19:19
            Start Date: 26/Apr/21 19:19
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#14641:
URL: https://github.com/apache/beam/pull/14641#discussion_r620542513



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -371,14 +371,27 @@ public void replaceChild(Node existing, Node replacement) 
{
      * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
      */
     public boolean isCompositeNode() {
-      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+      if (isRootNode()) {
+        LOG.info("Composite because root node {}", this);

Review comment:
       Is this a model requirement ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -508,35 +477,21 @@ private AsMultimap() {}
         throw new IllegalStateException("Unable to create a side-input view 
from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), 
"use_unified_worker"))) {
-        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
-        Coder<K> keyCoder = kvCoder.getKeyCoder();
-        Coder<V> valueCoder = kvCoder.getValueCoder();
-        PCollectionView<Map<K, Iterable<V>>> view =
-            PCollectionViews.multimapView(
-                input,
-                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
-                (TypeDescriptorSupplier<V>) 
valueCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
-      PCollection<KV<Void, KV<K, V>>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
+      // HACK: there are bugs in "composite" vs "primitive" transform 
distinction

Review comment:
       Ditto.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -350,29 +330,18 @@ private AsIterable() {}
         throw new IllegalStateException("Unable to create a side-input view 
from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), 
"use_unified_worker"))) {
-        Coder<T> inputCoder = input.getCoder();
-        PCollectionView<Iterable<T>> view =
-            PCollectionViews.iterableView(
-                input,
-                (TypeDescriptorSupplier<T>) 
inputCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
-      PCollection<KV<Void, T>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
+      // HACK: there are bugs in "composite" vs "primitive" transform 
distinction

Review comment:
       Is there a JIRA that we can reference here ?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -371,14 +371,27 @@ public void replaceChild(Node existing, Node replacement) 
{
      * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
      */
     public boolean isCompositeNode() {
-      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+      if (isRootNode()) {
+        LOG.info("Composite because root node {}", this);
+        return true;
+      }
+      if (!parts.isEmpty()) {
+        LOG.info("Composite because has subtransforms {}", this);
+        return true;
+      }
+      if (returnsOthersOutput()) {
+        LOG.info("Composite because returns others output {}", this);

Review comment:
       When will this actually occur (just to clarify) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 589401)
    Time Spent: 1h 50m  (was: 1h 40m)

> Dataflow side input translation "Unknown producer for value"
> ------------------------------------------------------------
>
>                 Key: BEAM-12222
>                 URL: https://issues.apache.org/jira/browse/BEAM-12222
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: P1
>             Fix For: 2.30.0
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> I have identified a seemingly nondeterministic issue in Dataflow translation, 
> where pipelines with side inputs sometimes are translated in the wrong order.
> {code}
> java.lang.NullPointerException: Unknown producer for value 
> SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:1221#4dca087078898728>}
>  while translating step 
> TfIdf.ComputeTfIdf/Combine.globally(Count)/ProduceDefault
>       at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1227)
> {code}
> Seen on 
> https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2_PR/32/testReport/junit/org.apache.beam.examples.complete/TfIdfIT/testE2ETfIdf/
>  and also other changes. I think the change itself is just triggering the 
> nondeterministic problem.
> So there is a lurking problem with side inputs overall.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to