lukecwik commented on a change in pull request #11821:
URL: https://github.com/apache/beam/pull/11821#discussion_r440421252



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
##########
@@ -1302,13 +1303,35 @@ private GloballyAsSingletonView(
 
     @Override
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      // TODO(BEAM-10097): Make this the default expansion for all portable 
runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")

Review comment:
       `use_runner_v2` is preferred but internal test framework within Google 
has not been fully migrated to use one over the other. Also Googlers typically 
use `use_unified_worker`

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
##########
@@ -71,8 +79,23 @@ public FlinkSideInputReader(
             tag.getId(), new SideInputInitializer<>(view));
     T result = sideInputs.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) 
view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());

Review comment:
       In this case the side input is a valid side input but Flink says it has 
no data for it by returning null. Also `viewFn.apply` expects a non-null value 
which is why we pass in an empty view.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to