[ 
https://issues.apache.org/jira/browse/BEAM-3706?focusedWorklogId=89066&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89066
 ]

ASF GitHub Bot logged work on BEAM-3706:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Apr/18 18:37
            Start Date: 09/Apr/18 18:37
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4924: [BEAM-3706] 
Removing side inputs from CombinePayload proto.
URL: https://github.com/apache/beam/pull/4924
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 2a1d98b0a20..c0bb262f929 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -348,15 +348,6 @@ message CombinePayload {
 
   // (Required) A reference to the Coder to use for accumulators of the 
CombineFn
   string accumulator_coder_id = 2;
-
-  // (Required) Additional pieces of context the DoFn may require that
-  // are not otherwise represented in the payload.
-  // (may force runners to execute the ParDo differently)
-  repeated Parameter parameters = 3;
-
-  // (Optional) A mapping of local input names to side inputs, describing
-  // the expected access pattern.
-  map<string, SideInput> side_inputs = 4;
 }
 
 // The payload for the test-only primitive TestStream
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 82f8dc51852..a9e0da15d2a 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
@@ -27,18 +26,15 @@
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -51,9 +47,6 @@
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Methods for translating between {@link Combine.PerKey} {@link PTransform 
PTransforms} and {@link
@@ -81,10 +74,16 @@ public String getUrn(Combine.PerKey<?, ?, ?> transform) {
     public FunctionSpec translate(
         AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, 
SdkComponents components)
         throws IOException {
-      return FunctionSpec.newBuilder()
-          .setUrn(COMBINE_TRANSFORM_URN)
-          .setPayload(payloadForCombine((AppliedPTransform) transform, 
components).toByteString())
-          .build();
+      if (transform.getTransform().getSideInputs().isEmpty()) {
+        return FunctionSpec.newBuilder()
+            .setUrn(COMBINE_TRANSFORM_URN)
+            .setPayload(payloadForCombine((AppliedPTransform) transform, 
components).toByteString())
+            .build();
+      } else {
+        // Combines with side inputs are translated as generic composites, 
which have a blank
+        // FunctionSpec.
+        return null;
+      }
     }
 
     @Override
@@ -123,8 +122,6 @@ public FunctionSpec translate(
     RunnerApi.SdkFunctionSpec getCombineFn();
 
     Coder<?> getAccumulatorCoder();
-
-    Map<String, RunnerApi.SideInput> getSideInputs();
   }
 
   /** Produces a {@link RunnerApi.CombinePayload} from a portable {@link 
CombineLike}. */
@@ -132,7 +129,6 @@ public FunctionSpec translate(
       CombineLike combine, SdkComponents components) throws IOException {
     return RunnerApi.CombinePayload.newBuilder()
         
.setAccumulatorCoderId(components.registerCoder(combine.getAccumulatorCoder()))
-        .putAllSideInputs(combine.getSideInputs())
         .setCombineFn(combine.getCombineFn())
         .build();
   }
@@ -172,50 +168,10 @@ public SdkFunctionSpec getCombineFn() {
               throw new IllegalStateException(e);
             }
           }
-
-          @Override
-          public Map<String, SideInput> getSideInputs() {
-            Map<String, SideInput> sideInputs = new HashMap<>();
-            for (PCollectionView<?> sideInput : 
combine.getTransform().getSideInputs()) {
-              sideInputs.put(
-                  sideInput.getTagInternal().getId(),
-                  ParDoTranslation.translateView(sideInput, components));
-            }
-            return sideInputs;
-          }
         },
         components);
   }
 
-  public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, 
?> application)
-      throws IOException {
-    PTransform<?, ?> transform = application.getTransform();
-    if (transform instanceof Combine.PerKey) {
-      return ((Combine.PerKey<?, ?, ?>) transform).getSideInputs();
-    }
-
-    SdkComponents sdkComponents = SdkComponents.create();
-    RunnerApi.PTransform combineProto = 
PTransformTranslation.toProto(application, sdkComponents);
-    CombinePayload payload = 
CombinePayload.parseFrom(combineProto.getSpec().getPayload());
-
-    List<PCollectionView<?>> views = new ArrayList<>();
-    RehydratedComponents components =
-        RehydratedComponents.forComponents(sdkComponents.toComponents());
-    for (Map.Entry<String, SideInput> sideInputEntry : 
payload.getSideInputsMap().entrySet()) {
-      String sideInputTag = sideInputEntry.getKey();
-      RunnerApi.SideInput sideInput = sideInputEntry.getValue();
-      PCollection<?> originalPCollection =
-          checkNotNull(
-              (PCollection<?>) application.getInputs().get(new 
TupleTag<>(sideInputTag)),
-              "no input with tag %s",
-              sideInputTag);
-      views.add(
-          PCollectionViewTranslation.viewFromProto(sideInput, sideInputTag, 
originalPCollection,
-              combineProto, components));
-    }
-    return views;
-  }
-
   private static class RawCombine<K, InputT, AccumT, OutputT>
       extends PTransformTranslation.RawPTransform<
           PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
@@ -275,43 +231,20 @@ public SdkFunctionSpec getCombineFn() {
     public Coder<?> getAccumulatorCoder() {
       return accumulatorCoder;
     }
-
-    @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
-      for (Map.Entry<String, SideInput> sideInputEntry : 
payload.getSideInputsMap().entrySet()) {
-        try {
-          additionalInputs.put(
-              new TupleTag<>(sideInputEntry.getKey()),
-              rehydratedComponents.getPCollection(
-                  protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
-        } catch (IOException exc) {
-          throw new IllegalStateException(
-              String.format(
-                  "Could not find input with name %s for %s transform",
-                  sideInputEntry.getKey(), Combine.class.getSimpleName()));
-        }
-      }
-      return additionalInputs;
-    }
-
-    @Override
-    public Map<String, SideInput> getSideInputs() {
-      return payload.getSideInputsMap();
-    }
   }
 
   @VisibleForTesting
   static CombinePayload toProto(
       AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> combine, SdkComponents 
sdkComponents)
       throws IOException {
+    checkArgument(
+        combine.getTransform().getSideInputs().isEmpty(),
+        "CombineTranslation.toProto cannot translate Combines with side 
inputs.");
     GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
     try {
       Coder<?> accumulatorCoder = extractAccumulatorCoder(combineFn, 
(AppliedPTransform) combine);
-      Map<String, SideInput> sideInputs = new HashMap<>();
       return RunnerApi.CombinePayload.newBuilder()
           .setAccumulatorCoderId(sdkComponents.registerCoder(accumulatorCoder))
-          .putAllSideInputs(sideInputs)
           .setCombineFn(toProto(combineFn, sdkComponents))
           .build();
     } catch (CannotProvideCoderException e) {
@@ -359,34 +292,51 @@ public static SdkFunctionSpec toProto(
   public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> 
transform)
       throws IOException {
     SdkComponents sdkComponents = SdkComponents.create();
-    String id = getCombinePayload(transform, 
sdkComponents).getAccumulatorCoderId();
+    String id = getCombinePayload(transform, sdkComponents)
+        .map(CombinePayload::getAccumulatorCoderId)
+        .orElseThrow(() -> new IOException("Transform does not contain an 
AccumulatorCoder"));
     Components components = sdkComponents.toComponents();
     return CoderTranslation.fromProto(
         components.getCodersOrThrow(id), 
RehydratedComponents.forComponents(components));
   }
 
   public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) 
throws IOException {
-    
checkArgument(payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN));
+    
checkArgument(payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN),
+        "Payload URN was \"%s\", should have been \"%s\".",
+        payload.getCombineFn().getSpec().getUrn(),
+        JAVA_SERIALIZED_COMBINE_FN_URN);
     return (GlobalCombineFn<?, ?, ?>)
         SerializableUtils.deserializeFromByteArray(
             payload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
   }
 
-  public static GlobalCombineFn<?, ?, ?> getCombineFn(AppliedPTransform<?, ?, 
?> transform)
+  public static Optional<GlobalCombineFn<?, ?, ?>> getCombineFn(
+      AppliedPTransform<?, ?, ?> transform)
       throws IOException {
-    return getCombineFn(getCombinePayload(transform));
+    Optional<CombinePayload> payload = getCombinePayload(transform);
+    if (payload.isPresent()) {
+      return Optional.of(getCombineFn(payload.get()));
+    } else {
+      return Optional.empty();
+    }
   }
 
-  private static CombinePayload getCombinePayload(AppliedPTransform<?, ?, ?> 
transform)
+  private static Optional<CombinePayload> 
getCombinePayload(AppliedPTransform<?, ?, ?> transform)
       throws IOException {
     return getCombinePayload(transform, SdkComponents.create());
   }
 
-  private static CombinePayload getCombinePayload(
+  private static Optional<CombinePayload> getCombinePayload(
       AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws 
IOException {
-    return CombinePayload.parseFrom(
-        PTransformTranslation.toProto(transform, Collections.emptyList(), 
components)
-            .getSpec()
-            .getPayload());
+    RunnerApi.PTransform proto = PTransformTranslation
+        .toProto(transform, Collections.emptyList(), components);
+
+    // Even if the proto has no spec, calling getSpec still returns a blank 
spec, which we want to
+    // avoid. It should be clear to the caller whether or not there was a spec 
in the transform.
+    if (proto.hasSpec()) {
+      return 
Optional.of(CombinePayload.parseFrom(proto.getSpec().getPayload()));
+    } else {
+      return Optional.empty();
+    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 74e482a10df..3c03b17ea18 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -196,10 +196,13 @@ private PTransformTranslation() {}
         transformBuilder.setSpec(spec);
       }
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
-      transformBuilder.setSpec(
+      FunctionSpec spec =
           KNOWN_PAYLOAD_TRANSLATORS
               .get(transform.getClass())
-              .translate(appliedPTransform, components));
+              .translate(appliedPTransform, components);
+      if (spec != null) {
+        transformBuilder.setSpec(spec);
+      }
     }
 
     return transformBuilder.build();
@@ -284,6 +287,7 @@ public static String urnForTransform(PTransform<?, ?> 
transform) {
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
     String getUrn(T transform);
 
+    @Nullable
     FunctionSpec translate(AppliedPTransform<?, ?, T> application, 
SdkComponents components)
         throws IOException;
 
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
index 9083d5344c5..067540c0b62 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
@@ -46,6 +46,7 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
@@ -93,7 +94,7 @@ public void leaveCompositeTransform(Node node) {
             }
           });
       checkState(combine.get() != null);
-      assertEquals(combineFn, CombineTranslation.getCombineFn(combine.get()));
+      assertEquals(combineFn, 
CombineTranslation.getCombineFn(combine.get()).orElse(null));
 
       SdkComponents sdkComponents = SdkComponents.create();
       CombinePayload combineProto = CombineTranslation.toProto(combine.get(), 
sdkComponents);
@@ -111,14 +112,13 @@ public void leaveCompositeTransform(Node node) {
   @RunWith(JUnit4.class)
   public static class ValidateCombineWithContextTest {
     @Rule public TestPipeline pipeline = TestPipeline.create();
+    @Rule public ExpectedException exception = ExpectedException.none();
 
     @Test
-    public void testToFromProtoWithSideInputs() throws Exception {
+    public void testToFromProtoWithoutSideInputs() throws Exception {
       PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
-      final PCollectionView<Iterable<String>> sideInput =
-          pipeline.apply(Create.of("foo")).apply(View.asIterable());
       CombineFnWithContext<Integer, int[], Integer> combineFn = new 
TestCombineFnWithContext();
-      
input.apply(Combine.globally(combineFn).withSideInputs(sideInput).withoutDefaults());
+      input.apply(Combine.globally(combineFn).withoutDefaults());
       final AtomicReference<AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>>> 
combine =
           new AtomicReference<>();
       pipeline.traverseTopologically(
@@ -132,7 +132,7 @@ public void leaveCompositeTransform(Node node) {
             }
           });
       checkState(combine.get() != null);
-      assertEquals(combineFn, CombineTranslation.getCombineFn(combine.get()));
+      assertEquals(combineFn, 
CombineTranslation.getCombineFn(combine.get()).orElse(null));
 
       SdkComponents sdkComponents = SdkComponents.create();
       CombinePayload combineProto = CombineTranslation.toProto(combine.get(), 
sdkComponents);
@@ -144,6 +144,40 @@ public void leaveCompositeTransform(Node node) {
               combineProto, 
RehydratedComponents.forComponents(componentsProto)));
       assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
     }
+
+    @Test
+    public void testToProtoWithSideInputsFails() throws Exception {
+      exception.expect(IllegalArgumentException.class);
+
+      PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+      final PCollectionView<Iterable<String>> sideInputs =
+          pipeline.apply(Create.of("foo")).apply(View.asIterable());
+
+      CombineFnWithContext<Integer, int[], Integer> combineFn = new 
TestCombineFnWithContext() {
+        @Override
+        public Integer extractOutput(int[] accumulator, Context c) {
+          Iterable<String> sideInput = c.sideInput(sideInputs);
+          return accumulator[0];
+        }
+      };
+
+      
input.apply(Combine.globally(combineFn).withSideInputs(sideInputs).withoutDefaults());
+      final AtomicReference<AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>>> 
combine =
+          new AtomicReference<>();
+      pipeline.traverseTopologically(
+          new PipelineVisitor.Defaults() {
+            @Override
+            public void leaveCompositeTransform(Node node) {
+              if (node.getTransform() instanceof Combine.PerKey) {
+                checkState(combine.get() == null);
+                combine.set((AppliedPTransform) 
node.toAppliedPTransform(getPipeline()));
+              }
+            }
+          });
+
+      SdkComponents sdkComponents = SdkComponents.create();
+      CombinePayload payload = CombineTranslation.toProto(combine.get(), 
sdkComponents);
+    }
   }
 
   private static class TestCombineFn extends Combine.CombineFn<Integer, Void, 
Void> {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index 5d14f8cef19..dbd5c691900 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -28,6 +28,7 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -72,8 +73,10 @@ public boolean matches(AppliedPTransform<?, ?, ?> 
application) {
         if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
             
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
           try {
-            GlobalCombineFn fn = CombineTranslation.getCombineFn(application);
-            return isApplicable(application.getInputs(), fn);
+            Optional<GlobalCombineFn<?, ?, ?>> fn = 
CombineTranslation.getCombineFn(application);
+            if (fn.isPresent()) {
+              return isApplicable(application.getInputs(), fn.get());
+            }
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
@@ -141,7 +144,13 @@ private Factory() {}
                     PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
OutputT>>>>
                 transform) {
       try {
-        GlobalCombineFn<?, ?, ?> globalFn = 
CombineTranslation.getCombineFn(transform);
+        GlobalCombineFn<?, ?, ?> globalFn = 
CombineTranslation.getCombineFn(transform).orElseThrow(
+            () -> new IOException(String
+                .format("%s.matcher() should only match %s instances using %s, 
but %s was missing",
+                    MultiStepCombine.class.getSimpleName(),
+                    PerKey.class.getSimpleName(),
+                    CombineFn.class.getSimpleName(),
+                    CombineFn.class.getSimpleName())));
         checkState(
             globalFn instanceof CombineFn,
             "%s.matcher() should only match %s instances using %s, got %s",
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 7898fc122e5..26f8e387dc2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -362,8 +362,9 @@ public void translateNode(
 
       CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
       try {
-            combineFn = (CombineFnBase.GlobalCombineFn<InputT, AccumT, 
OutputT>) CombineTranslation
-                .getCombineFn(context.getCurrentTransform());
+        combineFn = (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) 
CombineTranslation
+            .getCombineFn(context.getCurrentTransform())
+            .orElseThrow(() -> new IOException("CombineFn not found in 
node."));
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -393,19 +394,6 @@ public void translateNode(
       Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
           inputDataSet.groupBy(new KvKeySelector<>(inputCoder.getKeyCoder()));
 
-      // construct a map from side input to WindowingStrategy so that
-      // the DoFn runner can map main-input windows to side input windows
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = 
new HashMap<>();
-      List<PCollectionView<?>> sideInputs;
-      try {
-        sideInputs = 
CombineTranslation.getSideInputs(context.getCurrentTransform());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      for (PCollectionView<?> sideInput: sideInputs) {
-        sideInputStrategies.put(sideInput, 
sideInput.getWindowingStrategyInternal());
-      }
-
       WindowingStrategy<Object, BoundedWindow> boundedStrategy =
           (WindowingStrategy<Object, BoundedWindow>) windowingStrategy;
 
@@ -416,14 +404,14 @@ public void translateNode(
             new FlinkPartialReduceFunction<>(
                 combineFn,
                 boundedStrategy,
-                sideInputStrategies,
+                new HashMap<>(),
                 context.getPipelineOptions());
 
         FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
             new FlinkReduceFunction<>(
                 combineFn,
                 boundedStrategy,
-                sideInputStrategies,
+                new HashMap<>(),
                 context.getPipelineOptions());
 
         // Partially GroupReduce the values into the intermediate format 
AccumT (combine)
@@ -436,8 +424,6 @@ public void translateNode(
                 partialReduceFunction,
                 "GroupCombine: " + fullName);
 
-        transformSideInputs(sideInputs, groupCombine, context);
-
         TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
             context.getTypeInfo(context.getOutput(transform));
 
@@ -450,8 +436,6 @@ public void translateNode(
             new GroupReduceOperator<>(
                 intermediateGrouping, reduceTypeInfo, reduceFunction, 
fullName);
 
-        transformSideInputs(sideInputs, outputDataSet, context);
-
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
       } else {
@@ -461,7 +445,7 @@ public void translateNode(
 
         RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, 
WindowedValue<KV<K, OutputT>>>
             reduceFunction = new FlinkMergingNonShuffleReduceFunction<>(
-                combineFn, boundedStrategy, sideInputStrategies, 
context.getPipelineOptions());
+                combineFn, boundedStrategy, new HashMap<>(), 
context.getPipelineOptions());
 
         TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
             context.getTypeInfo(context.getOutput(transform));
@@ -474,8 +458,6 @@ public void translateNode(
             WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> 
outputDataSet =
             new GroupReduceOperator<>(grouping, reduceTypeInfo, 
reduceFunction, fullName);
 
-        transformSideInputs(sideInputs, outputDataSet, context);
-
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
       }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 74ca168a17f..59ccecd649f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -104,7 +104,6 @@
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
@@ -849,30 +848,6 @@ public void translateNode(
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
       PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
 
-    @Override
-    boolean canTranslate(
-        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
-        FlinkStreamingTranslationContext context) {
-
-      // if we have a merging window strategy and side inputs we cannot
-      // translate as a proper combine. We have to group and then run the 
combine
-      // over the final grouped values.
-      PCollection<KV<K, InputT>> input = context.getInput(transform);
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<?, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
-
-      boolean hasNoSideInputs;
-      try {
-        hasNoSideInputs = 
CombineTranslation.getSideInputs(context.getCurrentTransform()).isEmpty();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-
-      return windowingStrategy.getWindowFn().isNonMerging() || hasNoSideInputs;
-    }
-
     @Override
     public void translateNode(
         PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
@@ -915,7 +890,8 @@ public void translateNode(
       GlobalCombineFn<? super InputT, ?, OutputT> combineFn;
       try {
         combineFn = (GlobalCombineFn<? super InputT, ?, OutputT>)
-            CombineTranslation.getCombineFn(context.getCurrentTransform());
+            CombineTranslation.getCombineFn(context.getCurrentTransform())
+                .orElseThrow(() -> new IOException("CombineFn not found in 
node."));
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -929,84 +905,29 @@ public void translateNode(
       TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      List<PCollectionView<?>> sideInputs;
-      try {
-        sideInputs = 
CombineTranslation.getSideInputs(context.getCurrentTransform());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-
-      if (sideInputs.isEmpty()) {
-        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
-        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
-            new WindowDoFnOperator<>(
-                reduceFn,
-                fullName,
-                (Coder) windowedWorkItemCoder,
-                mainTag,
-                Collections.emptyList(),
-                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
-                windowingStrategy,
-                new HashMap<>(), /* side-input mapping */
-                Collections.emptyList(), /* side inputs */
-                context.getPipelineOptions(),
-                inputKvCoder.getKeyCoder());
-
-        // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
-        // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
-        @SuppressWarnings("unchecked")
-        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
-            keyedWorkItemStream
-                .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
-                .uid(fullName);
-        context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
-      } else {
-        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> 
transformSideInputs =
-            transformSideInputs(sideInputs, context);
+      TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+      WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+          new WindowDoFnOperator<>(
+              reduceFn,
+              fullName,
+              (Coder) windowedWorkItemCoder,
+              mainTag,
+              Collections.emptyList(),
+              new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+              windowingStrategy,
+              new HashMap<>(), /* side-input mapping */
+              Collections.emptyList(), /* side inputs */
+              context.getPipelineOptions(),
+              inputKvCoder.getKeyCoder());
 
-        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
-        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
-            new WindowDoFnOperator<>(
-                reduceFn,
-                fullName,
-                (Coder) windowedWorkItemCoder,
-                mainTag,
-                Collections.emptyList(),
-                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
-                windowingStrategy,
-                transformSideInputs.f0,
-                sideInputs,
-                context.getPipelineOptions(),
-                inputKvCoder.getKeyCoder());
-
-        // we have to manually construct the two-input transform because we're 
not
-        // allowed to have only one input keyed, normally.
-
-        TwoInputTransformation<
-            WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
-            RawUnionValue,
-            WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new 
TwoInputTransformation<>(
-            keyedWorkItemStream.getTransformation(),
-            transformSideInputs.f1.broadcast().getTransformation(),
-            fullName,
-            (TwoInputStreamOperator) doFnOperator,
-            outputTypeInfo,
-            keyedWorkItemStream.getParallelism());
-
-        rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
-        
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), 
null);
-
-        @SuppressWarnings({ "unchecked", "rawtypes" })
-        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
-            new SingleOutputStreamOperator(
-                keyedWorkItemStream.getExecutionEnvironment(),
-                rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
-        outDataStream.uid(fullName);
-
-        
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
-
-        context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
-      }
+      // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
+      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
+      @SuppressWarnings("unchecked")
+      SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+          keyedWorkItemStream
+              .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
+              .uid(fullName);
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
     }
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 6aaaf59ece6..316765a49b9 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -48,6 +48,7 @@
 
   protected final SerializablePipelineOptions serializedOptions;
 
+  // TODO: Remove side input functionality since liftable Combines no longer 
have side inputs.
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
   public FlinkPartialReduceFunction(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 871f8db9e81..20cc4588495 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -46,6 +46,7 @@
 
   protected final WindowingStrategy<Object, W> windowingStrategy;
 
+  // TODO: Remove side input functionality since liftable Combines no longer 
have side inputs.
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
   protected final SerializablePipelineOptions serializedOptions;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 89066)
    Time Spent: 3.5h  (was: 3h 20m)

> Update CombinePayload to improved model for Portability
> -------------------------------------------------------
>
>                 Key: BEAM-3706
>                 URL: https://issues.apache.org/jira/browse/BEAM-3706
>             Project: Beam
>          Issue Type: Sub-task
>          Components: beam-model
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: Minor
>              Labels: portability
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> This will mean changing the proto definition in beam_runner_api, most likely 
> trimming out fields that are no longer necessary and adding any new ones that 
> could be useful. The majority of work will probably be in investigating if 
> some existing fields can actually be removed (SideInputs and Parameters for 
> example).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to