taegeonum closed pull request #148: [NEMO-260] Beam Accumulator-based Partial 
Aggregation
URL: https://github.com/apache/incubator-nemo/pull/148
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 9118d983b..7a22ba8a9 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -26,6 +26,8 @@
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.nemo.common.ir.edge.IREdge;
+import 
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
@@ -279,8 +281,52 @@ private static void flattenTranslator(final 
PipelineTranslationContext ctx,
     final PipelineTranslationContext ctx,
     final TransformHierarchy.Node beamNode,
     final PTransform<?, ?> transform) {
-    // TODO #260: Beam Accumulator-based Partial Aggregation
-    return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+
+    // Check if the partial combining optimization can be applied.
+    // If not, simply use the default Combine implementation by entering into 
it.
+    if (!isGlobalWindow(beamNode, ctx.getPipeline())) {
+      // TODO #263: Partial Combining for Beam Streaming
+      return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+    }
+    final Combine.PerKey perKey = (Combine.PerKey) transform;
+    if (!perKey.getSideInputs().isEmpty()) {
+      // TODO #264: Partial Combining with Beam SideInputs
+      return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    // This Combine can be optimized as the following sequence of Nemo 
IRVertices.
+    // Combine Input -> Combine(Partial Combine -> KV<InputT, AccumT> -> Final 
Combine) -> Combine Output
+    final CombineFnBase.GlobalCombineFn combineFn = perKey.getFn();
+
+    // (Step 1) To Partial Combine
+    final IRVertex partialCombine = new OperatorVertex(new 
CombineFnPartialTransform<>(combineFn));
+    ctx.addVertex(partialCombine);
+    beamNode.getInputs().values().forEach(input -> 
ctx.addEdgeTo(partialCombine, input));
+
+    // (Step 2) To Final Combine
+    final PCollection input = (PCollection) Iterables.getOnlyElement(
+      
TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
+    final KvCoder inputCoder = (KvCoder) input.getCoder();
+    final Coder accumulatorCoder;
+    try {
+      accumulatorCoder =
+        combineFn.getAccumulatorCoder(ctx.getPipeline().getCoderRegistry(), 
inputCoder.getValueCoder());
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(e);
+    }
+    final IRVertex finalCombine = new OperatorVertex(new 
CombineFnFinalTransform<>(combineFn));
+    ctx.addVertex(finalCombine);
+    final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, 
partialCombine, finalCombine);
+    ctx.addEdgeTo(
+      edge,
+      KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+      input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    // (Step 3) To Combine Output
+    beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, finalCombine, output));
+
+    // This composite transform has been translated in its entirety.
+    return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
   }
 
   /**
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
new file mode 100644
index 000000000..d7f0e3e6a
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnFinalTransform.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Accumulates all of the partially accumulated KVs(Key, Accum) into KVs(Key, 
Output).
+ * (Currently supports batch-style global windows only)
+ * TODO #263: Partial Combining for Beam Streaming
+ * TODO #264: Partial Combining with Beam SideInputs
+ * @param <K> Key type.
+ * @param <A> Accum type.
+ * @param <O> Output type.
+ */
+public final class CombineFnFinalTransform<K, A, O>
+  extends NoWatermarkEmitTransform<WindowedValue<KV<K, A>>, 
WindowedValue<KV<K, O>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CombineFnFinalTransform.class.getName());
+  private final Map<K, A> keyToAcuumulator;
+  private OutputCollector<WindowedValue<KV<K, O>>> outputCollector;
+
+  // null arguments when calling methods of this variable, since we don't 
support sideinputs yet.
+  private final GlobalCombineFnRunner<?, A, O> combineFnRunner;
+
+  /**
+   * Constructor.
+   */
+  public CombineFnFinalTransform(final CombineFnBase.GlobalCombineFn<?, A, O> 
combineFn) {
+    this.combineFnRunner = GlobalCombineFnRunners.create(combineFn);
+    this.keyToAcuumulator = new HashMap<>();
+  }
+
+  @Override
+  public void prepare(final Context context, final 
OutputCollector<WindowedValue<KV<K, O>>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<KV<K, A>> element) {
+    final K key = element.getValue().getKey();
+    final A accum = element.getValue().getValue();
+
+    // The initial accumulator
+    keyToAcuumulator.putIfAbsent(
+      key, combineFnRunner.createAccumulator(null, null, null));
+
+    // Get the accumulator
+    final A accumulatorForThisElement = keyToAcuumulator.get(key);
+
+    // Update the accumulator (merge)
+    keyToAcuumulator.put(
+      key,
+      combineFnRunner.mergeAccumulators(
+        Arrays.asList(accumulatorForThisElement, accum), null, null, null));
+  }
+
+  @Override
+  public void close() {
+    final Iterator<Map.Entry<K, A>> iterator = 
keyToAcuumulator.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<K, A> entry = iterator.next();
+      final K key = entry.getKey();
+      final A accum = entry.getValue();
+      final O output = combineFnRunner.extractOutput(accum, null, null, null);
+      outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(key, 
output)));
+      iterator.remove(); // for eager garbage collection
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("CombineFnPartialTransform:");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
new file mode 100644
index 000000000..dbe5acd4d
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineFnPartialTransform.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Partially accumulates the given KVs(Key, Input) into KVs(Key, Accum).
+ * (Currently supports batch-style global windows only)
+ * TODO #263: Partial Combining for Beam Streaming
+ * TODO #264: Partial Combining with Beam SideInputs
+ * @param <K> Key type.
+ * @param <I> Input type.
+ * @param <A> Accum type.
+ */
+public final class CombineFnPartialTransform<K, I, A>
+  extends NoWatermarkEmitTransform<WindowedValue<KV<K, I>>, 
WindowedValue<KV<K, A>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CombineFnPartialTransform.class.getName());
+  private final Map<K, A> keyToAcuumulator;
+  private OutputCollector<WindowedValue<KV<K, A>>> outputCollector;
+
+  // null arguments when calling methods of this variable, since we don't 
support sideinputs yet.
+  private final GlobalCombineFnRunner<I, A, ?> combineFnRunner;
+
+  /**
+   * Constructor.
+   */
+  public CombineFnPartialTransform(final CombineFnBase.GlobalCombineFn<I, A, 
?> combineFn) {
+    this.combineFnRunner = GlobalCombineFnRunners.create(combineFn);
+    this.keyToAcuumulator = new HashMap<>();
+  }
+
+  @Override
+  public void prepare(final Context context, final 
OutputCollector<WindowedValue<KV<K, A>>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<KV<K, I>> element) {
+    final K key = element.getValue().getKey();
+    final I val = element.getValue().getValue();
+
+    // The initial accumulator
+    keyToAcuumulator.putIfAbsent(
+      key, combineFnRunner.createAccumulator(null, null, null));
+
+    // Get the accumulator
+    final A accumulatorForThisElement = keyToAcuumulator.get(key);
+
+    // Update the accumulator
+    keyToAcuumulator.putIfAbsent(
+      key,
+      combineFnRunner.addInput(accumulatorForThisElement, val, null, null, 
null));
+  }
+
+  @Override
+  public void close() {
+    final Iterator<Map.Entry<K, A>> iterator = 
keyToAcuumulator.entrySet().iterator();
+    while (iterator.hasNext()) {
+      final Map.Entry<K, A> entry = iterator.next();
+      final K key = entry.getKey();
+      final A accum = entry.getValue();
+      final A compactAccum = combineFnRunner.compact(accum, null, null, null);
+      outputCollector.emit(WindowedValue.valueInGlobalWindow(KV.of(key, 
compactAccum)));
+      iterator.remove(); // for eager garbage collection
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("CombineFnPartialTransform:");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index fde90a9a1..be3f0083c 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -36,8 +36,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class BeamFrontendALSTest {
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testALSDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index d52d13c72..632c30aff 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -36,8 +36,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public class BeamFrontendMLRTest {
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testMLRDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
index 8b23349f9..426976d7a 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
@@ -49,8 +49,7 @@ public void setUp() throws Exception {
     compiledDAG = CompilerTestUtil.compileALSDAG();
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testTransientResourcePass() throws Exception {
     final DAG<IRVertex, IREdge> processedDAG = new 
TransientResourceCompositePass().apply(compiledDAG);
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
index 6f8ed04d6..4af6214c0 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPassTest.java
@@ -44,8 +44,7 @@ public void setUp() throws Exception {
     compiledDAG = CompilerTestUtil.compileALSDAG();
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testLoopGrouping() {
     final DAG<IRVertex, IREdge> processedDAG = new 
LoopExtractionPass().apply(compiledDAG);
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
index 29ef1d7e8..d1a66b8bb 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionALSInefficientTest.java
@@ -46,8 +46,7 @@ public void setUp() throws Exception {
     groupedDAG = new LoopExtractionPass().apply(inefficientALSDAG);
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testForInefficientALSDAG() throws Exception {
     final long expectedNumOfVertices = groupedDAG.getVertices().size() + 3;
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index d6b3085d1..bdc2a859d 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -105,8 +105,7 @@ public void setUp() throws Exception {
     dagToBeRefactored = builder.build();
   }
 
-  // TODO #260: Beam Accumulator-based Partial Aggregation
-  // @Test
+  @Test
   public void testLoopInvariantCodeMotionPass() throws Exception {
     final long numberOfGroupedVertices = groupedDAG.getVertices().size();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to