Repository: apex-malhar Updated Branches: refs/heads/master dcca7752a -> 2b2d5bca9
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java new file mode 100644 index 0000000..cda6bf8 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.apex.malhar.lib.window.Tuple; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * Test for {@link ReduceFn}. + */ +public class FoldFnTest +{ + public static class NumGen extends BaseOperator implements InputOperator + { + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); + + public static int count = 0; + private int i = 0; + + public NumGen() + { + count = 0; + i = 0; + } + + @Override + public void emitTuples() + { + while (i <= 7) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore it. + } + count++; + if (i >= 0) { + output.emit(i++); + } + } + i = -1; + } + } + + public static class Collector extends BaseOperator + { + private static int result; + + public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = new DefaultInputPort<Tuple.WindowedTuple<Integer>>() + { + @Override + public void process(Tuple.WindowedTuple<Integer> tuple) + { + result = tuple.getValue(); + } + }; + + public int getResult() + { + return result; + } + } + + public static class Plus extends FoldFn<Integer, Integer> + { + @Override + public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2) + { + return fold(accumulatedValue1, accumulatedValue2); + } + + @Override + public Integer fold(Integer input1, Integer input2) + { + if (input1 == null) { + return input2; + } + return input1 + input2; + } + } + + @Test + public void FoldFnTest() + { + + FoldFn<String, String> concat = new FoldFn<String, String>() + { + @Override + public String merge(String accumulatedValue1, String accumulatedValue2) + { + return fold(accumulatedValue1, accumulatedValue2); + } + + @Override + public String fold(String input1, String input2) + { + return input1 + ", " + input2; + } + }; + + String[] ss = new String[]{"b", "c", "d", "e"}; + String base = "a"; + + for (String s : ss) { + base = concat.accumulate(base, s); + } + Assert.assertEquals("a, b, c, d, e", base); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java new file mode 100644 index 0000000..891a824 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link Group}. + */ +public class GroupTest +{ + @Test + public void GroupTest() + { + Group<Integer> group = new Group<>(); + + List<Integer> accu = group.defaultAccumulatedValue(); + Assert.assertEquals(0, accu.size()); + Assert.assertEquals(1, group.accumulate(accu, 10).size()); + Assert.assertEquals(2, group.accumulate(accu, 11).size()); + Assert.assertEquals(3, group.accumulate(accu, 11).size()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java new file mode 100644 index 0000000..fe87d9e --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Comparator; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for Max accumulation + */ +public class MaxTest +{ + @Test + public void MaxTest() + { + Max<Integer> max = new Max<>(); + + Assert.assertEquals((Integer)5, max.accumulate(5, 3)); + Assert.assertEquals((Integer)6, max.accumulate(4, 6)); + Assert.assertEquals((Integer)5, max.merge(5, 2)); + + Comparator<Integer> com = new Comparator<Integer>() + { + @Override + public int compare(Integer o1, Integer o2) + { + return -(o1.compareTo(o2)); + } + }; + + max.setComparator(com); + Assert.assertEquals((Integer)3, max.accumulate(5, 3)); + Assert.assertEquals((Integer)4, max.accumulate(4, 6)); + Assert.assertEquals((Integer)2, max.merge(5, 2)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java new file mode 100644 index 0000000..3589735 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Comparator; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link Min}. + */ +public class MinTest +{ + @Test + public void MinTest() + { + Min<Integer> min = new Min<>(); + + Assert.assertEquals((Integer)3, min.accumulate(5, 3)); + Assert.assertEquals((Integer)4, min.accumulate(4, 6)); + Assert.assertEquals((Integer)2, min.merge(5, 2)); + + Comparator<Integer> com = new Comparator<Integer>() + { + @Override + public int compare(Integer o1, Integer o2) + { + return -(o1.compareTo(o2)); + } + }; + + min.setComparator(com); + Assert.assertEquals((Integer)5, min.accumulate(5, 3)); + Assert.assertEquals((Integer)6, min.accumulate(4, 6)); + Assert.assertEquals((Integer)5, min.merge(5, 2)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java new file mode 100644 index 0000000..26d73a7 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link ReduceFn}. + */ +public class ReduceFnTest +{ + + @Test + public void ReduceFnTest() + { + ReduceFn<String> concat = new ReduceFn<String>() + { + @Override + public String reduce(String input1, String input2) + { + return input1 + ", " + input2; + } + }; + + String[] ss = new String[]{"b", "c", "d", "e"}; + String base = "a"; + + for (String s : ss) { + base = concat.accumulate(base, s); + } + Assert.assertEquals("a, b, c, d, e", base); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java new file mode 100644 index 0000000..674f871 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Set; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link RemoveDuplicates}. + */ +public class RemoveDuplicatesTest +{ + @Test + public void RemoveDuplicatesTest() + { + RemoveDuplicates<Integer> rd = new RemoveDuplicates<>(); + + Set<Integer> accu = rd.defaultAccumulatedValue(); + Assert.assertEquals(0, accu.size()); + Assert.assertEquals(1, rd.accumulate(accu, 10).size()); + Assert.assertEquals(2, rd.accumulate(accu, 11).size()); + Assert.assertEquals(2, rd.accumulate(accu, 11).size()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java new file mode 100644 index 0000000..4c55612 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.commons.lang.mutable.MutableDouble; +import org.apache.commons.lang.mutable.MutableFloat; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; + +/** + * Test for different Sum Accumulations. + */ +public class SumTest +{ + @Test + public void SumTest() + { + SumInt si = new SumInt(); + SumLong sl = new SumLong(); + SumFloat sf = new SumFloat(); + SumDouble sd = new SumDouble(); + + Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10)); + Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10)); + Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21))); + + Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L)); + Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L)); + Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L))); + + Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F)); + Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F)); + Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F))); + + Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0)); + Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0)); + Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9))); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java new file mode 100644 index 0000000..5bf2207 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * Unit test for TopNByKey accumulation + */ +public class TopNByKeyTest +{ + @Test + public void TopNByKeyTest() throws Exception + { + TopNByKey<String, Integer> topNByKey = new TopNByKey<>(); + topNByKey.setN(3); + Map<String, Integer> accu = topNByKey.defaultAccumulatedValue(); + + Assert.assertEquals(0, accu.size()); + + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1)); + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3)); + + List<KeyValPair<String, Integer>> result1 = new ArrayList<>(); + + result1.add(new KeyValPair<String, Integer>("3", 3)); + result1.add(new KeyValPair<String, Integer>("1", 1)); + + Assert.assertEquals(result1, topNByKey.getOutput(accu)); + + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2)); + + List<KeyValPair<String, Integer>> result2 = new ArrayList<>(); + + result2.add(new KeyValPair<String, Integer>("3", 3)); + result2.add(new KeyValPair<String, Integer>("2", 2)); + result2.add(new KeyValPair<String, Integer>("1", 1)); + + Assert.assertEquals(result2, topNByKey.getOutput(accu)); + + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5)); + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4)); + + List<KeyValPair<String, Integer>> result3 = new ArrayList<>(); + + result3.add(new KeyValPair<String, Integer>("5", 5)); + result3.add(new KeyValPair<String, Integer>("4", 4)); + result3.add(new KeyValPair<String, Integer>("3", 3)); + + Assert.assertEquals(result3, topNByKey.getOutput(accu)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java deleted file mode 100644 index fb4de3c..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.commons.lang3.tuple.MutablePair; - -/** - * Test for {@link Average}. - */ -public class AverageTest -{ - @Test - public void AverageTest() - { - Average ave = new Average(); - MutablePair<Double, Long> accu = ave.defaultAccumulatedValue(); - - for (int i = 1; i <= 10; i++) { - accu = ave.accumulate(accu, (double)i); - } - Assert.assertTrue(5.5 == accu.getLeft()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java deleted file mode 100644 index 4e6f8f1..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.apex.malhar.lib.window.Tuple; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -/** - * Test for {@link ReduceFn}. - */ -public class FoldFnTest -{ - public static class NumGen extends BaseOperator implements InputOperator - { - public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); - - public static int count = 0; - private int i = 0; - - public NumGen() - { - count = 0; - i = 0; - } - - @Override - public void emitTuples() - { - while (i <= 7) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore it. - } - count++; - if (i >= 0) { - output.emit(i++); - } - } - i = -1; - } - } - - public static class Collector extends BaseOperator - { - private static int result; - - public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = new DefaultInputPort<Tuple.WindowedTuple<Integer>>() - { - @Override - public void process(Tuple.WindowedTuple<Integer> tuple) - { - result = tuple.getValue(); - } - }; - - public int getResult() - { - return result; - } - } - - public static class Plus extends FoldFn<Integer, Integer> - { - @Override - public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2) - { - return fold(accumulatedValue1, accumulatedValue2); - } - - @Override - public Integer fold(Integer input1, Integer input2) - { - if (input1 == null) { - return input2; - } - return input1 + input2; - } - } - - @Test - public void FoldFnTest() - { - - FoldFn<String, String> concat = new FoldFn<String, String>() - { - @Override - public String merge(String accumulatedValue1, String accumulatedValue2) - { - return fold(accumulatedValue1, accumulatedValue2); - } - - @Override - public String fold(String input1, String input2) - { - return input1 + ", " + input2; - } - }; - - String[] ss = new String[]{"b", "c", "d", "e"}; - String base = "a"; - - for (String s : ss) { - base = concat.accumulate(base, s); - } - Assert.assertEquals("a, b, c, d, e", base); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java deleted file mode 100644 index a9aac77..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.List; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for {@link Group}. - */ -public class GroupTest -{ - @Test - public void GroupTest() - { - Group<Integer> group = new Group<>(); - - List<Integer> accu = group.defaultAccumulatedValue(); - Assert.assertEquals(0, accu.size()); - Assert.assertEquals(1, group.accumulate(accu, 10).size()); - Assert.assertEquals(2, group.accumulate(accu, 11).size()); - Assert.assertEquals(3, group.accumulate(accu, 11).size()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java deleted file mode 100644 index c873125..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Comparator; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for Max accumulation - */ -public class MaxTest -{ - @Test - public void MaxTest() - { - Max<Integer> max = new Max<>(); - - Assert.assertEquals((Integer)5, max.accumulate(5, 3)); - Assert.assertEquals((Integer)6, max.accumulate(4, 6)); - Assert.assertEquals((Integer)5, max.merge(5, 2)); - - Comparator<Integer> com = new Comparator<Integer>() - { - @Override - public int compare(Integer o1, Integer o2) - { - return -(o1.compareTo(o2)); - } - }; - - max.setComparator(com); - Assert.assertEquals((Integer)3, max.accumulate(5, 3)); - Assert.assertEquals((Integer)4, max.accumulate(4, 6)); - Assert.assertEquals((Integer)2, max.merge(5, 2)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java deleted file mode 100644 index 74816b0..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Comparator; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for {@link Min}. - */ -public class MinTest -{ - @Test - public void MinTest() - { - Min<Integer> min = new Min<>(); - - Assert.assertEquals((Integer)3, min.accumulate(5, 3)); - Assert.assertEquals((Integer)4, min.accumulate(4, 6)); - Assert.assertEquals((Integer)2, min.merge(5, 2)); - - Comparator<Integer> com = new Comparator<Integer>() - { - @Override - public int compare(Integer o1, Integer o2) - { - return -(o1.compareTo(o2)); - } - }; - - min.setComparator(com); - Assert.assertEquals((Integer)5, min.accumulate(5, 3)); - Assert.assertEquals((Integer)6, min.accumulate(4, 6)); - Assert.assertEquals((Integer)5, min.merge(5, 2)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java deleted file mode 100644 index 6b5bbad..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for {@link ReduceFn}. - */ -public class ReduceFnTest -{ - - @Test - public void ReduceFnTest() - { - ReduceFn<String> concat = new ReduceFn<String>() - { - @Override - public String reduce(String input1, String input2) - { - return input1 + ", " + input2; - } - }; - - String[] ss = new String[]{"b", "c", "d", "e"}; - String base = "a"; - - for (String s : ss) { - base = concat.accumulate(base, s); - } - Assert.assertEquals("a, b, c, d, e", base); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java deleted file mode 100644 index f0196d2..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Set; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Test for {@link RemoveDuplicates}. - */ -public class RemoveDuplicatesTest -{ - @Test - public void RemoveDuplicatesTest() - { - RemoveDuplicates<Integer> rd = new RemoveDuplicates<>(); - - Set<Integer> accu = rd.defaultAccumulatedValue(); - Assert.assertEquals(0, accu.size()); - Assert.assertEquals(1, rd.accumulate(accu, 10).size()); - Assert.assertEquals(2, rd.accumulate(accu, 11).size()); - Assert.assertEquals(2, rd.accumulate(accu, 11).size()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java deleted file mode 100644 index 65b6480..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.commons.lang.mutable.MutableDouble; -import org.apache.commons.lang.mutable.MutableFloat; -import org.apache.commons.lang.mutable.MutableInt; -import org.apache.commons.lang.mutable.MutableLong; - -/** - * Test for different Sum Accumulations. - */ -public class SumTest -{ - @Test - public void SumTest() - { - SumInt si = new SumInt(); - SumLong sl = new SumLong(); - SumFloat sf = new SumFloat(); - SumDouble sd = new SumDouble(); - - Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10)); - Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10)); - Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21))); - - Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L)); - Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L)); - Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L))); - - Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F)); - Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F)); - Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F))); - - Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0)); - Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0)); - Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9))); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java deleted file mode 100644 index 3f6ac09..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.util.KeyValPair; - -/** - * Unit test for TopNByKey accumulation - */ -public class TopNByKeyTest -{ - @Test - public void TopNByKeyTest() throws Exception - { - TopNByKey<String, Integer> topNByKey = new TopNByKey<>(); - topNByKey.setN(3); - Map<String, Integer> accu = topNByKey.defaultAccumulatedValue(); - - Assert.assertEquals(0, accu.size()); - - accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1)); - accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3)); - - List<KeyValPair<String, Integer>> result1 = new ArrayList<>(); - - result1.add(new KeyValPair<String, Integer>("3", 3)); - result1.add(new KeyValPair<String, Integer>("1", 1)); - - Assert.assertEquals(result1, topNByKey.getOutput(accu)); - - accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2)); - - List<KeyValPair<String, Integer>> result2 = new ArrayList<>(); - - result2.add(new KeyValPair<String, Integer>("3", 3)); - result2.add(new KeyValPair<String, Integer>("2", 2)); - result2.add(new KeyValPair<String, Integer>("1", 1)); - - Assert.assertEquals(result2, topNByKey.getOutput(accu)); - - accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5)); - accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4)); - - List<KeyValPair<String, Integer>> result3 = new ArrayList<>(); - - result3.add(new KeyValPair<String, Integer>("5", 5)); - result3.add(new KeyValPair<String, Integer>("4", 4)); - result3.add(new KeyValPair<String, Integer>("3", 3)); - - Assert.assertEquals(result3, topNByKey.getOutput(accu)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java index 84f05fc..0f5ce1e 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java @@ -25,10 +25,10 @@ import org.joda.time.Duration; import org.apache.apex.malhar.lib.window.Accumulation; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.accumulation.FoldFn; +import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; -import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn; -import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn; import org.apache.apex.malhar.stream.api.function.Function; import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java index ebd5eea..5866a4c 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java @@ -28,14 +28,14 @@ import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.accumulation.Count; +import org.apache.apex.malhar.lib.window.accumulation.FoldFn; +import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; +import org.apache.apex.malhar.lib.window.accumulation.TopN; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; -import org.apache.apex.malhar.lib.window.impl.accumulation.Count; -import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn; -import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn; -import org.apache.apex.malhar.lib.window.impl.accumulation.TopN; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.Option;
