Repository: apex-malhar
Updated Branches:
  refs/heads/master a202cdc7c -> fc8b674e3


APEXMALHAR-2428 CompositeAccumulation for windowed operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e035d61f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e035d61f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e035d61f

Branch: refs/heads/master
Commit: e035d61fe651afed51c67613374de455ae5febe0
Parents: eaa3bf3
Author: brightchen <[email protected]>
Authored: Fri Jan 27 12:37:07 2017 -0800
Committer: brightchen <[email protected]>
Committed: Tue Feb 28 11:16:11 2017 -0800

----------------------------------------------------------------------
 .../accumulation/CompositeAccumulation.java     | 119 +++++++++++++++++++
 .../accumulation/CompositeAccumulationTest.java |  67 +++++++++++
 2 files changed, 186 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e035d61f/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java
new file mode 100644
index 0000000..2184388
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+
+/**
+ * The CompositeAccumulation is an Accumulation which delegate task to sub 
accumulations.
+ *
+ */
+@SuppressWarnings("rawtypes")
+@Evolving
+public class CompositeAccumulation<InputT> implements Accumulation<InputT, 
List, List>
+{
+  /**
+   * The AccumulationTag hide the implementation and prevent invalid input 
parameters
+   *
+   */
+  public static class AccumulationTag
+  {
+    private int index;
+    private AccumulationTag(int index)
+    {
+      this.index = index;
+    }
+  }
+
+  private List<Accumulation<InputT, Object, ?>> accumulations = 
Lists.newArrayList();
+
+  /**
+   * @param accumulation The sub accumulation add to the composite.
+   * @return The AccumulationTag. The client can get the value of sub 
accumulation by returned AccumulationTag.
+   */
+  public AccumulationTag addAccumulation(Accumulation<InputT, Object, ?> 
accumulation)
+  {
+    accumulations.add(accumulation);
+    return new AccumulationTag(accumulations.size() - 1);
+  }
+
+  /**
+   *
+   * @param tag The tag represents the sub accumulation, which can be got from 
method addAccumulation()
+   * @param output The output of the composite accumulation
+   * @return The output of sub accumulation.
+   */
+  public Object getSubOutput(AccumulationTag tag, List output)
+  {
+    int index = tag.index;
+    return accumulations.get(index).getOutput(output.get(index));
+  }
+
+  @Override
+  public List defaultAccumulatedValue()
+  {
+    List defaultValues = Lists.newArrayList();
+    for (Accumulation accumulation : accumulations) {
+      defaultValues.add(accumulation.defaultAccumulatedValue());
+    }
+    return defaultValues;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  public List accumulate(List accumulatedValues, InputT input)
+  {
+    for (int index = 0; index < accumulations.size(); ++index) {
+      Accumulation accumulation = accumulations.get(index);
+      Object oldValue = accumulatedValues.get(index);
+      Object newValue = accumulation.accumulate(oldValue, input);
+      if (newValue != oldValue) {
+        accumulatedValues.set(index, newValue);
+      }
+    }
+    return accumulatedValues;
+  }
+
+  @Override
+  public List merge(List accumulatedValues1, List accumulatedValues2)
+  {
+    for (int index = 0; index < accumulations.size(); ++index) {
+      accumulatedValues1.set(index,
+          accumulations.get(index).merge(accumulatedValues1.get(index), 
accumulatedValues2.get(index)));
+    }
+    return accumulatedValues1;
+  }
+
+  @Override
+  public List getOutput(List accumulatedValues)
+  {
+    return accumulatedValues;
+  }
+
+  @Override
+  public List getRetraction(List values)
+  {
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e035d61f/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java
new file mode 100644
index 0000000..fed1acb
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.SumAccumulation;
+import 
org.apache.apex.malhar.lib.window.accumulation.CompositeAccumulation.AccumulationTag;
+
+public class CompositeAccumulationTest
+{
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testIncremental()
+  {
+    CompositeAccumulation<Long> accumulations = new CompositeAccumulation<>();
+    AccumulationTag sumTag = accumulations.addAccumulation((Accumulation)new 
SumAccumulation());
+    AccumulationTag countTag = accumulations.addAccumulation((Accumulation)new 
Count());
+    AccumulationTag maxTag = accumulations.addAccumulation(new Max());
+    AccumulationTag minTag = accumulations.addAccumulation(new Min());
+    List values = accumulations.defaultAccumulatedValue();
+    for (long i = 1; i <= 10; i++) {
+      values = accumulations.accumulate(values, i);
+    }
+
+    List outputValues = accumulations.getOutput(values);
+    Assert.assertTrue((Long)accumulations.getSubOutput(sumTag, outputValues) 
== 55L);
+    Assert.assertTrue((Long)accumulations.getSubOutput(countTag, outputValues) 
== 10L);
+    Assert.assertTrue((Long)accumulations.getSubOutput(maxTag, outputValues) 
== 10L);
+    Assert.assertTrue((Long)accumulations.getSubOutput(minTag, outputValues) 
== 1L);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testAverage()
+  {
+    CompositeAccumulation<Double> accumulations = new 
CompositeAccumulation<>();
+    AccumulationTag averageTag = 
accumulations.addAccumulation((Accumulation)new Average());
+    List values = accumulations.defaultAccumulatedValue();
+    for (int i = 1; i <= 10; i++) {
+      values = accumulations.accumulate(values, i * 1.0);
+    }
+
+    List outputValues = accumulations.getOutput(values);
+    Assert.assertTrue((Double)accumulations.getSubOutput(averageTag, 
outputValues) == 5.5);
+  }
+}

Reply via email to