This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new afe0793302c Add a workaround for BEAM-20873 for optimized list side
inputs. (#31163)
afe0793302c is described below
commit afe0793302cbe89d57adbcbabbe958f0d5c7d8e2
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri May 3 10:25:59 2024 -0700
Add a workaround for BEAM-20873 for optimized list side inputs. (#31163)
This triggered a case with Dataflow Runner v1 and PTransformOverrides
that exposed https://github.com/apache/beam/issues/20873 .
---
.../beam/runners/dataflow/worker/IsmSideInputReader.java | 4 +++-
.../src/main/java/org/apache/beam/sdk/transforms/View.java | 13 ++++++++++---
2 files changed, 13 insertions(+), 4 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 30602444153..c2e8c334a09 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -70,6 +70,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
+import org.apache.beam.sdk.values.PCollectionViews.IterableBackedListViewFn;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
@@ -354,7 +355,8 @@ public class IsmSideInputReader implements SideInputReader {
if (viewFn instanceof IterableViewFn
|| viewFn instanceof IterableViewFn2
|| viewFn instanceof ListViewFn
- || viewFn instanceof ListViewFn2) {
+ || viewFn instanceof ListViewFn2
+ || viewFn instanceof IterableBackedListViewFn) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getListForWindow(tag, window);
return viewT;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index ca04542b372..16084c569f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -286,11 +286,18 @@ public class View {
}
private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T>
input) {
+ Coder<T> inputCoder = input.getCoder();
+ // HACK to work around https://github.com/apache/beam/issues/20873:
+ // There are bugs in "composite" vs "primitive" transform distinction
+ // in TransformHierachy. This noop transform works around them and
should be zero
+ // cost.
+ PCollection<T> materializationInput =
+ input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
PCollectionView<List<T>> view =
PCollectionViews.listView(
- input,
- (TypeDescriptorSupplier<T>)
input.getCoder()::getEncodedTypeDescriptor,
- input.getWindowingStrategy());
+ materializationInput,
+ (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
+ materializationInput.getWindowingStrategy());
input.apply(CreatePCollectionView.of(view));
return view;
}