This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new afe0793302c Add a workaround for BEAM-20873 for optimized list side 
inputs. (#31163)
afe0793302c is described below

commit afe0793302cbe89d57adbcbabbe958f0d5c7d8e2
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri May 3 10:25:59 2024 -0700

    Add a workaround for BEAM-20873 for optimized list side inputs. (#31163)
    
    This triggered a case with Dataflow Runner v1 and PTransformOverrides
    that exposed https://github.com/apache/beam/issues/20873 .
---
 .../beam/runners/dataflow/worker/IsmSideInputReader.java    |  4 +++-
 .../src/main/java/org/apache/beam/sdk/transforms/View.java  | 13 ++++++++++---
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 30602444153..c2e8c334a09 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -70,6 +70,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
+import org.apache.beam.sdk.values.PCollectionViews.IterableBackedListViewFn;
 import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
 import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
@@ -354,7 +355,8 @@ public class IsmSideInputReader implements SideInputReader {
                   if (viewFn instanceof IterableViewFn
                       || viewFn instanceof IterableViewFn2
                       || viewFn instanceof ListViewFn
-                      || viewFn instanceof ListViewFn2) {
+                      || viewFn instanceof ListViewFn2
+                      || viewFn instanceof IterableBackedListViewFn) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getListForWindow(tag, window);
                     return viewT;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index ca04542b372..16084c569f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -286,11 +286,18 @@ public class View {
     }
 
     private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T> 
input) {
+      Coder<T> inputCoder = input.getCoder();
+      // HACK to work around https://github.com/apache/beam/issues/20873:
+      // There are bugs in "composite" vs "primitive" transform distinction
+      // in TransformHierachy. This noop transform works around them and 
should be zero
+      // cost.
+      PCollection<T> materializationInput =
+          input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
-              input,
-              (TypeDescriptorSupplier<T>) 
input.getCoder()::getEncodedTypeDescriptor,
-              input.getWindowingStrategy());
+              materializationInput,
+              (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
+              materializationInput.getWindowingStrategy());
       input.apply(CreatePCollectionView.of(view));
       return view;
     }

Reply via email to