kennknowles commented on a change in pull request #14641:
URL: https://github.com/apache/beam/pull/14641#discussion_r620594252



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -371,14 +371,27 @@ public void replaceChild(Node existing, Node replacement) 
{
      * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
      */
     public boolean isCompositeNode() {
-      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+      if (isRootNode()) {
+        LOG.info("Composite because root node {}", this);

Review comment:
       No this seems to be a heuristic. It caused real problems for me because 
I didn't realize it. Especially the "returns others output" clause.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -371,14 +371,27 @@ public void replaceChild(Node existing, Node replacement) 
{
      * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
      */
     public boolean isCompositeNode() {
-      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+      if (isRootNode()) {
+        LOG.info("Composite because root node {}", this);
+        return true;
+      }
+      if (!parts.isEmpty()) {
+        LOG.info("Composite because has subtransforms {}", this);
+        return true;
+      }
+      if (returnsOthersOutput()) {
+        LOG.info("Composite because returns others output {}", this);

Review comment:
       Yea, it can. This is why `CreateDataflowView.forBatch` was considered a 
composite. Because it materializes its input and then returns its input. So it 
returns another transform's output and the traversal thinks it is a composite. 
That is why I had to add the extra `materializatoinInput = 
input.apply(<identity>)` bit.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -350,29 +330,18 @@ private AsIterable() {}
         throw new IllegalStateException("Unable to create a side-input view 
from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), 
"use_unified_worker"))) {
-        Coder<T> inputCoder = input.getCoder();
-        PCollectionView<Iterable<T>> view =
-            PCollectionViews.iterableView(
-                input,
-                (TypeDescriptorSupplier<T>) 
inputCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
-      PCollection<KV<Void, T>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
+      // HACK: there are bugs in "composite" vs "primitive" transform 
distinction

Review comment:
       Filed https://issues.apache.org/jira/browse/BEAM-12228

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
##########
@@ -508,35 +477,21 @@ private AsMultimap() {}
         throw new IllegalStateException("Unable to create a side-input view 
from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), 
"use_unified_worker"))) {
-        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
-        Coder<K> keyCoder = kvCoder.getKeyCoder();
-        Coder<V> valueCoder = kvCoder.getValueCoder();
-        PCollectionView<Map<K, Iterable<V>>> view =
-            PCollectionViews.multimapView(
-                input,
-                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
-                (TypeDescriptorSupplier<V>) 
valueCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
-      PCollection<KV<Void, KV<K, V>>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
+      // HACK: there are bugs in "composite" vs "primitive" transform 
distinction

Review comment:
       https://issues.apache.org/jira/browse/BEAM-12228




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to