http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java new file mode 100644 index 0000000..337cefb --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Sink; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link InvertIndex} <p> + * + */ +@Deprecated +public class InvertIndexTest +{ + private static Logger log = LoggerFactory.getLogger(InvertIndexTest.class); + + /** + * Test oper logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + InvertIndex<String,String> oper = new InvertIndex<String,String>(); + CollectorTestSink indexSink = new CollectorTestSink(); + + Sink inSink = oper.data.getSink(); + oper.index.setSink(indexSink); + + oper.beginWindow(0); + + HashMap<String, String> input = new HashMap<String, String>(); + + input.put("a", "str"); + input.put("b", "str"); + inSink.put(input); + + input.clear(); + input.put("a", "str1"); + input.put("b", "str1"); + inSink.put(input); + + input.clear(); + input.put("c", "blah"); + inSink.put(input); + + input.clear(); + input.put("c", "str1"); + inSink.put(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 3, indexSink.collectedTuples.size()); + for (Object o: indexSink.collectedTuples) { + log.debug(o.toString()); + HashMap<String, ArrayList<String>> output = (HashMap<String, ArrayList<String>>)o; + for (Map.Entry<String, ArrayList<String>> e: output.entrySet()) { + String key = e.getKey(); + ArrayList<String> alist = e.getValue(); + if (key.equals("str1")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c")); + + } else if (key.equals("str")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", false, alist.contains("c")); + + } else if (key.equals("blah")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", false, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", false, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c")); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java new file mode 100644 index 0000000..c930932 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link LastMatchMap}<p> + * + */ +@Deprecated +public class LastMatchMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new LastMatchMap<String, Integer>()); + testNodeProcessingSchema(new LastMatchMap<String, Double>()); + testNodeProcessingSchema(new LastMatchMap<String, Float>()); + testNodeProcessingSchema(new LastMatchMap<String, Short>()); + testNodeProcessingSchema(new LastMatchMap<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(LastMatchMap oper) + { + CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); + oper.last.setSink(matchSink); + oper.setKey("a"); + oper.setValue(3); + oper.setTypeEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 4); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.put("a", 3); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 2); + oper.data.process(input); + input.clear(); + input.put("a", 4); + input.put("b", 21); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 3); + input.put("b", 52); + input.put("c", 5); + oper.data.process(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + HashMap<String, Number> tuple = (HashMap<String, Number>)matchSink.tuple; + Number aval = tuple.get("a"); + Number bval = tuple.get("b"); + Assert.assertEquals("Value of a was ", 3, aval.intValue()); + Assert.assertEquals("Value of a was ", 52, bval.intValue()); + matchSink.clear(); + + oper.beginWindow(0); + input.clear(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 5); + oper.data.process(input); + oper.endWindow(); + // There should be no emit as all tuples do not match + Assert.assertEquals("number emitted tuples", 0, matchSink.count); + matchSink.clear(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java new file mode 100644 index 0000000..2c0fcad --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link LeastFrequentKeyMap}<p> + * + */ +@Deprecated +public class LeastFrequentKeyMapTest +{ + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + LeastFrequentKeyMap<String, Integer> oper = new LeastFrequentKeyMap<String, Integer>(); + CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); + CountAndLastTupleTestSink listSink = new CountAndLastTupleTestSink(); + oper.least.setSink(matchSink); + oper.list.setSink(listSink); + + oper.beginWindow(0); + HashMap<String, Integer> amap = new HashMap<String, Integer>(1); + HashMap<String, Integer> bmap = new HashMap<String, Integer>(1); + HashMap<String, Integer> cmap = new HashMap<String, Integer>(1); + int atot = 5; + int btot = 3; + int ctot = 6; + amap.put("a", null); + bmap.put("b", null); + cmap.put("c", null); + for (int i = 0; i < atot; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot; i++) { + oper.data.process(cmap); + } + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + HashMap<String, Integer> tuple = (HashMap<String, Integer>)matchSink.tuple; + Integer val = tuple.get("b"); + Assert.assertEquals("Count of b was ", btot, val.intValue()); + Assert.assertEquals("number emitted tuples", 1, listSink.count); + ArrayList<HashMap<String, Integer>> list = (ArrayList<HashMap<String, Integer>>)listSink.tuple; + val = list.get(0).get("b"); + Assert.assertEquals("Count of b was ", btot, val.intValue()); + + matchSink.clear(); + listSink.clear(); + oper.beginWindow(0); + atot = 5; + btot = 10; + ctot = 5; + for (int i = 0; i < atot; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot; i++) { + oper.data.process(cmap); + } + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + Assert.assertEquals("number emitted tuples", 1, listSink.count); + list = (ArrayList<HashMap<String, Integer>>)listSink.tuple; + int acount = 0; + int ccount = 0; + for (HashMap<String, Integer> h : list) { + val = h.get("a"); + if (val == null) { + ccount = h.get("c"); + } else { + acount = val; + } + } + Assert.assertEquals("Count of a was ", atot, acount); + Assert.assertEquals("Count of c was ", ctot, ccount); + HashMap<String, Integer> mtuple = (HashMap<String, Integer>)matchSink.tuple; + val = mtuple.get("a"); + if (val == null) { + val = mtuple.get("c"); + } + Assert.assertEquals("Count of least frequent key was ", ctot, val.intValue()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java new file mode 100644 index 0000000..287312c --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link LeastFrequentKeyValueMap}<p> + * + */ +@Deprecated +public class LeastFrequentKeyValueMapTest +{ + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + LeastFrequentKeyValueMap<String, Integer> oper = new LeastFrequentKeyValueMap<String, Integer>(); + CollectorTestSink matchSink = new CollectorTestSink(); + oper.least.setSink(matchSink); + + oper.beginWindow(0); + HashMap<String, Integer> amap = new HashMap<String, Integer>(1); + HashMap<String, Integer> bmap = new HashMap<String, Integer>(1); + HashMap<String, Integer> cmap = new HashMap<String, Integer>(1); + int atot1 = 5; + int btot1 = 3; + int ctot1 = 6; + amap.put("a", 1); + bmap.put("b", 2); + cmap.put("c", 4); + for (int i = 0; i < atot1; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot1; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot1; i++) { + oper.data.process(cmap); + } + + atot1 = 4; + btot1 = 3; + ctot1 = 10; + amap.put("a", 5); + bmap.put("b", 4); + cmap.put("c", 3); + for (int i = 0; i < atot1; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot1; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot1; i++) { + oper.data.process(cmap); + } + + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 3, matchSink.collectedTuples.size()); + int vcount; + for (Object o: matchSink.collectedTuples) { + HashMap<String, HashMap<Integer, Integer>> omap = (HashMap<String, HashMap<Integer, Integer>>)o; + for (Map.Entry<String, HashMap<Integer, Integer>> e: omap.entrySet()) { + String key = e.getKey(); + if (key.equals("a")) { + vcount = e.getValue().get(5); + Assert.assertEquals("Key \"a\" has value ", 4, vcount); + + } else if (key.equals("b")) { + vcount = e.getValue().get(2); + Assert.assertEquals("Key \"a\" has value ", 3, vcount); + vcount = e.getValue().get(4); + Assert.assertEquals("Key \"a\" has value ", 3, vcount); + + } else if (key.equals("c")) { + vcount = e.getValue().get(4); + Assert.assertEquals("Key \"a\" has value ", 6, vcount); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java new file mode 100644 index 0000000..1d2b5ff --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.algo.MatchMap; +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link com.datatorrent.lib.algo.MatchMap}<p> + * + */ +@Deprecated +public class MatchMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new MatchMap<String, Integer>()); + testNodeProcessingSchema(new MatchMap<String, Double>()); + testNodeProcessingSchema(new MatchMap<String, Float>()); + testNodeProcessingSchema(new MatchMap<String, Short>()); + testNodeProcessingSchema(new MatchMap<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(MatchMap oper) + { + CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); + oper.match.setSink(matchSink); + oper.setKey("a"); + oper.setValue(3.0); + oper.setTypeNEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 3); + oper.data.process(input); + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)matchSink.tuple).entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(2), new Double(e.getValue().doubleValue())); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), new Double(e.getValue().doubleValue())); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), new Double(e.getValue().doubleValue())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java new file mode 100644 index 0000000..58a1059 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link MostFrequentKeyMap}<p> + * + */ +@Deprecated +public class MostFrequentKeyMapTest +{ + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + MostFrequentKeyMap<String, Integer> oper = new MostFrequentKeyMap<String, Integer>(); + CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); + CountAndLastTupleTestSink listSink = new CountAndLastTupleTestSink(); + oper.most.setSink(matchSink); + oper.list.setSink(listSink); + + oper.beginWindow(0); + HashMap<String, Integer> amap = new HashMap<String, Integer>(1); + HashMap<String, Integer> bmap = new HashMap<String, Integer>(1); + HashMap<String, Integer> cmap = new HashMap<String, Integer>(1); + int atot = 5; + int btot = 7; + int ctot = 6; + amap.put("a", null); + bmap.put("b", null); + cmap.put("c", null); + for (int i = 0; i < atot; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot; i++) { + oper.data.process(cmap); + } + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + HashMap<String, Integer> tuple = (HashMap<String, Integer>)matchSink.tuple; + Integer val = tuple.get("b"); + Assert.assertEquals("Count of b was ", btot, val.intValue()); + Assert.assertEquals("number emitted tuples", 1, listSink.count); + ArrayList<HashMap<String, Integer>> list = (ArrayList<HashMap<String, Integer>>)listSink.tuple; + val = list.get(0).get("b"); + Assert.assertEquals("Count of b was ", btot, val.intValue()); + + matchSink.clear(); + listSink.clear(); + oper.beginWindow(0); + atot = 5; + btot = 4; + ctot = 5; + for (int i = 0; i < atot; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot; i++) { + oper.data.process(cmap); + } + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + Assert.assertEquals("number emitted tuples", 1, listSink.count); + list = (ArrayList<HashMap<String, Integer>>)listSink.tuple; + int acount = 0; + int ccount = 0; + for (HashMap<String, Integer> h : list) { + val = h.get("a"); + if (val == null) { + ccount = h.get("c"); + } else { + acount = val; + } + } + Assert.assertEquals("Count of a was ", atot, acount); + Assert.assertEquals("Count of c was ", ctot, ccount); + HashMap<String, Integer> mtuple = (HashMap<String, Integer>)matchSink.tuple; + val = mtuple.get("a"); + if (val == null) { + val = mtuple.get("c"); + } + Assert.assertEquals("Count of least frequent key was ", ctot, val.intValue()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java new file mode 100644 index 0000000..c356079 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link MostFrequentKeyValueMap}<p> + * + */ +@Deprecated +public class MostFrequentKeyValueMapTest +{ + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + MostFrequentKeyValueMap<String, Integer> oper = new MostFrequentKeyValueMap<String, Integer>(); + CollectorTestSink matchSink = new CollectorTestSink(); + oper.most.setSink(matchSink); + + oper.beginWindow(0); + HashMap<String, Integer> amap = new HashMap<String, Integer>(1); + HashMap<String, Integer> bmap = new HashMap<String, Integer>(1); + HashMap<String, Integer> cmap = new HashMap<String, Integer>(1); + int atot1 = 5; + int btot1 = 3; + int ctot1 = 6; + amap.put("a", 1); + bmap.put("b", 2); + cmap.put("c", 4); + for (int i = 0; i < atot1; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot1; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot1; i++) { + oper.data.process(cmap); + } + + atot1 = 4; + btot1 = 3; + ctot1 = 10; + amap.put("a", 5); + bmap.put("b", 4); + cmap.put("c", 3); + for (int i = 0; i < atot1; i++) { + oper.data.process(amap); + } + for (int i = 0; i < btot1; i++) { + oper.data.process(bmap); + } + for (int i = 0; i < ctot1; i++) { + oper.data.process(cmap); + } + + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 3, matchSink.collectedTuples.size()); + int vcount; + for (Object o: matchSink.collectedTuples) { + HashMap<String, HashMap<Integer, Integer>> omap = (HashMap<String, HashMap<Integer, Integer>>)o; + for (Map.Entry<String, HashMap<Integer, Integer>> e: omap.entrySet()) { + String key = e.getKey(); + if (key.equals("a")) { + vcount = e.getValue().get(1); + Assert.assertEquals("Key \"a\" has value ", 5, vcount); + + } else if (key.equals("b")) { + vcount = e.getValue().get(2); + Assert.assertEquals("Key \"a\" has value ", 3, vcount); + vcount = e.getValue().get(4); + Assert.assertEquals("Key \"a\" has value ", 3, vcount); + + } else if (key.equals("c")) { + vcount = e.getValue().get(3); + Assert.assertEquals("Key \"a\" has value ", 10, vcount); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java new file mode 100644 index 0000000..e91b0f9 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountTestSink; + +/** + * @deprecated + * Functional tests for {@link Sampler}<p> + * + */ +@Deprecated +public class SamplerTest +{ + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + Sampler<String> oper = new Sampler<String>(); + CountTestSink sink = new CountTestSink<String>(); + oper.sample.setSink(sink); + oper.setSamplingPercentage(.1); + + String tuple = "a"; + + + int numTuples = 10000; + oper.beginWindow(0); + for (int i = 0; i < numTuples; i++) { + oper.data.process(tuple); + } + + oper.endWindow(); + int lowerlimit = 5; + int upperlimit = 15; + int actual = (100 * sink.count) / numTuples; + + Assert.assertEquals("number emitted tuples", true, lowerlimit < actual); + Assert.assertEquals("number emitted tuples", true, upperlimit > actual); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java new file mode 100644 index 0000000..a5bd664 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import org.junit.Assert; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +/** + * + * Functional tests for {@link ChangeAlertKeyVal}. + * <p> + * @deprecated + */ +@Deprecated +public class ChangeAlertKeyValTest +{ + private static Logger log = LoggerFactory + .getLogger(ChangeAlertKeyValTest.class); + + /** + * Test node logic emits correct results. + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ChangeAlertKeyVal<String, Integer>()); + testNodeProcessingSchema(new ChangeAlertKeyVal<String, Double>()); + testNodeProcessingSchema(new ChangeAlertKeyVal<String, Float>()); + testNodeProcessingSchema(new ChangeAlertKeyVal<String, Short>()); + testNodeProcessingSchema(new ChangeAlertKeyVal<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public <V extends Number> void testNodeProcessingSchema( + ChangeAlertKeyVal<String, V> oper) + { + CollectorTestSink alertSink = new CollectorTestSink(); + + oper.alert.setSink(alertSink); + oper.setPercentThreshold(5); + + oper.beginWindow(0); + oper.data.process(new KeyValPair<String, V>("a", oper.getValue(200))); + oper.data.process(new KeyValPair<String, V>("b", oper.getValue(10))); + oper.data.process(new KeyValPair<String, V>("c", oper.getValue(100))); + + oper.data.process(new KeyValPair<String, V>("a", oper.getValue(203))); + oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12))); + oper.data.process(new KeyValPair<String, V>("c", oper.getValue(101))); + + oper.data.process(new KeyValPair<String, V>("a", oper.getValue(210))); + oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12))); + oper.data.process(new KeyValPair<String, V>("c", oper.getValue(102))); + + oper.data.process(new KeyValPair<String, V>("a", oper.getValue(231))); + oper.data.process(new KeyValPair<String, V>("b", oper.getValue(18))); + oper.data.process(new KeyValPair<String, V>("c", oper.getValue(103))); + oper.endWindow(); + + // One for a, Two for b + Assert.assertEquals("number emitted tuples", 3, + alertSink.collectedTuples.size()); + + double aval = 0; + double bval = 0; + log.debug("\nLogging tuples"); + for (Object o : alertSink.collectedTuples) { + KeyValPair<String, KeyValPair<Number, Double>> map = (KeyValPair<String, KeyValPair<Number, Double>>)o; + + log.debug(o.toString()); + if (map.getKey().equals("a")) { + KeyValPair<Number, Double> vmap = map.getValue(); + if (vmap != null) { + aval += vmap.getValue().doubleValue(); + } + } else { + KeyValPair<Number, Double> vmap = map.getValue(); + if (vmap != null) { + bval += vmap.getValue().doubleValue(); + } + } + } + Assert.assertEquals("change in a", 10.0, aval,0); + Assert.assertEquals("change in a", 70.0, bval,0); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java new file mode 100644 index 0000000..aa757af --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * + * Functional tests for {@link ChangeAlertMap}. + * <p> + * @deprecated + */ +@Deprecated +public class ChangeAlertMapTest +{ + private static Logger log = LoggerFactory.getLogger(ChangeAlertMapTest.class); + + /** + * Test node logic emits correct results. + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ChangeAlertMap<String, Integer>()); + testNodeProcessingSchema(new ChangeAlertMap<String, Double>()); + testNodeProcessingSchema(new ChangeAlertMap<String, Float>()); + testNodeProcessingSchema(new ChangeAlertMap<String, Short>()); + testNodeProcessingSchema(new ChangeAlertMap<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public <V extends Number> void testNodeProcessingSchema( + ChangeAlertMap<String, V> oper) + { + CollectorTestSink alertSink = new CollectorTestSink(); + + oper.alert.setSink(alertSink); + oper.setPercentThreshold(5); + + oper.beginWindow(0); + HashMap<String, V> input = new HashMap<String, V>(); + input.put("a", oper.getValue(200)); + input.put("b", oper.getValue(10)); + input.put("c", oper.getValue(100)); + oper.data.process(input); + + input.clear(); + input.put("a", oper.getValue(203)); + input.put("b", oper.getValue(12)); + input.put("c", oper.getValue(101)); + oper.data.process(input); + + input.clear(); + input.put("a", oper.getValue(210)); + input.put("b", oper.getValue(12)); + input.put("c", oper.getValue(102)); + oper.data.process(input); + + input.clear(); + input.put("a", oper.getValue(231)); + input.put("b", oper.getValue(18)); + input.put("c", oper.getValue(103)); + oper.data.process(input); + oper.endWindow(); + + // One for a, Two for b + Assert.assertEquals("number emitted tuples", 3, + alertSink.collectedTuples.size()); + + double aval = 0; + double bval = 0; + log.debug("\nLogging tuples"); + for (Object o : alertSink.collectedTuples) { + HashMap<String, HashMap<Number, Double>> map = (HashMap<String, HashMap<Number, Double>>)o; + Assert.assertEquals("map size", 1, map.size()); + log.debug(o.toString()); + HashMap<Number, Double> vmap = map.get("a"); + if (vmap != null) { + aval += vmap.get(231.0).doubleValue(); + } + vmap = map.get("b"); + if (vmap != null) { + if (vmap.get(12.0) != null) { + bval += vmap.get(12.0).doubleValue(); + } else { + bval += vmap.get(18.0).doubleValue(); + } + } + } + Assert.assertEquals("change in a", 10.0, aval,0); + Assert.assertEquals("change in a", 70.0, bval,0); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java new file mode 100644 index 0000000..745b7e5 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +/** + * + * Functional tests for {@link ChangeAlert}. <p> + * @deprecated + */ +@Deprecated +public class ChangeAlertTest +{ + private static Logger log = LoggerFactory.getLogger(ChangeAlertTest.class); + + /** + * Test node logic emits correct results. + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ChangeAlert<Integer>()); + testNodeProcessingSchema(new ChangeAlert<Double>()); + testNodeProcessingSchema(new ChangeAlert<Float>()); + testNodeProcessingSchema(new ChangeAlert<Short>()); + testNodeProcessingSchema(new ChangeAlert<Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public <V extends Number> void testNodeProcessingSchema(ChangeAlert<V> oper) + { + CollectorTestSink alertSink = new CollectorTestSink(); + + oper.alert.setSink(alertSink); + oper.setPercentThreshold(5); + + oper.beginWindow(0); + oper.data.process(oper.getValue(10)); + oper.data.process(oper.getValue(12)); // alert + oper.data.process(oper.getValue(12)); + oper.data.process(oper.getValue(18)); // alert + oper.data.process(oper.getValue(0)); // alert + oper.data.process(oper.getValue(20)); // this will not alert + oper.data.process(oper.getValue(30)); // alert + + oper.endWindow(); + + // One for a, Two for b + Assert.assertEquals("number emitted tuples", 4, alertSink.collectedTuples.size()); + + double aval = 0; + log.debug("\nLogging tuples"); + for (Object o: alertSink.collectedTuples) { + KeyValPair<Number, Double> map = (KeyValPair<Number, Double>)o; + log.debug(o.toString()); + aval += map.getValue().doubleValue(); + } + Assert.assertEquals("change in a", 220.0, aval,0); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java new file mode 100644 index 0000000..0b3318c --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +/** + * + * Functional tests for {@link ChangeKeyVal}. + * <p> + * @deprecated + */ +@Deprecated +public class ChangeKeyValTest +{ + private static Logger log = LoggerFactory.getLogger(ChangeKeyValTest.class); + + /** + * Test node logic emits correct results. + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ChangeKeyVal<String, Integer>()); + testNodeProcessingSchema(new ChangeKeyVal<String, Double>()); + testNodeProcessingSchema(new ChangeKeyVal<String, Float>()); + testNodeProcessingSchema(new ChangeKeyVal<String, Short>()); + testNodeProcessingSchema(new ChangeKeyVal<String, Long>()); + } + + /** + * + * @param oper + * key/value pair for comparison. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public <V extends Number> void testNodeProcessingSchema( + ChangeKeyVal<String, V> oper) + { + CollectorTestSink changeSink = new CollectorTestSink(); + CollectorTestSink percentSink = new CollectorTestSink(); + + oper.change.setSink(changeSink); + oper.percent.setSink(percentSink); + + oper.beginWindow(0); + oper.base.process(new KeyValPair<String, V>("a", oper.getValue(2))); + oper.base.process(new KeyValPair<String, V>("b", oper.getValue(10))); + oper.base.process(new KeyValPair<String, V>("c", oper.getValue(100))); + + oper.data.process(new KeyValPair<String, V>("a", oper.getValue(3))); + oper.data.process(new KeyValPair<String, V>("b", oper.getValue(2))); + oper.data.process(new KeyValPair<String, V>("c", oper.getValue(4))); + + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", 3, + changeSink.collectedTuples.size()); + Assert.assertEquals("number emitted tuples", 3, + percentSink.collectedTuples.size()); + + log.debug("\nLogging tuples"); + for (Object o : changeSink.collectedTuples) { + KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o; + if (kv.getKey().equals("a")) { + Assert.assertEquals("change in a ", 1.0, kv.getValue()); + } + if (kv.getKey().equals("b")) { + Assert.assertEquals("change in b ", -8.0, kv.getValue()); + } + if (kv.getKey().equals("c")) { + Assert.assertEquals("change in c ", -96.0, kv.getValue()); + } + } + + for (Object o : percentSink.collectedTuples) { + KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o; + if (kv.getKey().equals("a")) { + Assert.assertEquals("change in a ", 50.0, kv.getValue()); + } + if (kv.getKey().equals("b")) { + Assert.assertEquals("change in b ", -80.0, kv.getValue()); + } + if (kv.getKey().equals("c")) { + Assert.assertEquals("change in c ", -96.0, kv.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java new file mode 100644 index 0000000..9ce0a73 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * + * Functional tests for {@link Change}. + * <p> + * @deprecated + */ +@Deprecated +public class ChangeTest +{ + private static Logger log = LoggerFactory.getLogger(ChangeTest.class); + + /** + * Test node logic emits correct results. + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new Change<Integer>()); + testNodeProcessingSchema(new Change<Double>()); + testNodeProcessingSchema(new Change<Float>()); + testNodeProcessingSchema(new Change<Short>()); + testNodeProcessingSchema(new Change<Long>()); + } + + /** + * + * @param oper Data value for comparison. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public <V extends Number> void testNodeProcessingSchema(Change<V> oper) + { + CollectorTestSink changeSink = new CollectorTestSink(); + CollectorTestSink percentSink = new CollectorTestSink(); + + oper.change.setSink(changeSink); + oper.percent.setSink(percentSink); + + oper.beginWindow(0); + oper.base.process(oper.getValue(10)); + oper.data.process(oper.getValue(5)); + oper.data.process(oper.getValue(15)); + oper.data.process(oper.getValue(20)); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 3, + changeSink.collectedTuples.size()); + Assert.assertEquals("number emitted tuples", 3, + percentSink.collectedTuples.size()); + + log.debug("\nLogging tuples"); + for (Object o : changeSink.collectedTuples) { + log.debug(String.format("change %s", o)); + } + for (Object o : percentSink.collectedTuples) { + log.debug(String.format("percent change %s", o)); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java new file mode 100644 index 0000000..9785a4a --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * + * Functional tests for {@link CompareExceptMap}<p> + * @deprecated + */ +@Deprecated +public class CompareExceptMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new CompareExceptMap<String, Integer>()); + testNodeProcessingSchema(new CompareExceptMap<String, Double>()); + testNodeProcessingSchema(new CompareExceptMap<String, Float>()); + testNodeProcessingSchema(new CompareExceptMap<String, Short>()); + testNodeProcessingSchema(new CompareExceptMap<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(CompareExceptMap oper) + { + CountAndLastTupleTestSink compareSink = new CountAndLastTupleTestSink(); + CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink(); + oper.compare.setSink(compareSink); + oper.except.setSink(exceptSink); + + oper.setKey("a"); + oper.setValue(3.0); + oper.setTypeEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 3); + input.put("b", 21); + input.put("c", 30); + oper.data.process(input); + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", 1, exceptSink.count); + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple).entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0); + } + } + + Assert.assertEquals("number emitted tuples", 1, compareSink.count); + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)compareSink.tuple).entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(3), e.getValue().doubleValue(), 0); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(21), e.getValue().doubleValue(), 0); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(30), e.getValue().doubleValue(), 0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java new file mode 100644 index 0000000..c91e4c9 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * + * Functional tests for {@link CompareMap}<p> + * @deprecated + */ +@Deprecated +public class CompareMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new CompareMap<String, Integer>()); + testNodeProcessingSchema(new CompareMap<String, Double>()); + testNodeProcessingSchema(new CompareMap<String, Float>()); + testNodeProcessingSchema(new CompareMap<String, Short>()); + testNodeProcessingSchema(new CompareMap<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(CompareMap oper) + { + CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); + oper.compare.setSink(matchSink); + oper.setKey("a"); + oper.setValue(3.0); + oper.setTypeNEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 3); + oper.data.process(input); + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)matchSink.tuple).entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java new file mode 100644 index 0000000..317790a --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +/** + * + * Functional tests for {@link CountKeyVal}. <p> + * @deprecated + */ +@Deprecated +public class CountKeyValTest +{ + /** + * Test operator logic emits correct results. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() + { + CountKeyVal<String, Double> oper = new CountKeyVal<String, Double>(); + CollectorTestSink countSink = new CollectorTestSink(); + oper.count.setSink(countSink); + + oper.beginWindow(0); // + + oper.data.process(new KeyValPair("a", 2.0)); + oper.data.process(new KeyValPair("b", 20.0)); + oper.data.process(new KeyValPair("c", 1000.0)); + oper.data.process(new KeyValPair("a", 1.0)); + oper.data.process(new KeyValPair("a", 10.0)); + oper.data.process(new KeyValPair("b", 5.0)); + oper.data.process(new KeyValPair("d", 55.0)); + oper.data.process(new KeyValPair("b", 12.0)); + oper.data.process(new KeyValPair("d", 22.0)); + oper.data.process(new KeyValPair("d", 14.2)); + oper.data.process(new KeyValPair("d", 46.0)); + oper.data.process(new KeyValPair("e", 2.0)); + oper.data.process(new KeyValPair("a", 23.0)); + oper.data.process(new KeyValPair("d", 4.0)); + + oper.endWindow(); // + + // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" + Assert.assertEquals("number emitted tuples", 5, countSink.collectedTuples.size()); + for (Object o : countSink.collectedTuples) { + KeyValPair<String, Integer> e = (KeyValPair<String, Integer>)o; + Integer val = (Integer)e.getValue(); + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", 4, val.intValue()); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", 3, val.intValue()); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", 1, val.intValue()); + } else if (e.getKey().equals("d")) { + Assert.assertEquals("emitted tuple for 'd' was ", 5, val.intValue()); + } else if (e.getKey().equals("e")) { + Assert.assertEquals("emitted tuple for 'e' was ", 1, val.intValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java new file mode 100644 index 0000000..8c8b267 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link ExceptMap} + */ +@Deprecated +public class ExceptMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ExceptMap<String, Integer>()); + testNodeProcessingSchema(new ExceptMap<String, Double>()); + testNodeProcessingSchema(new ExceptMap<String, Float>()); + testNodeProcessingSchema(new ExceptMap<String, Short>()); + testNodeProcessingSchema(new ExceptMap<String, Long>()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testNodeProcessingSchema(ExceptMap oper) + { + CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink(); + oper.except.setSink(exceptSink); + oper.setKey("a"); + oper.setValue(3.0); + oper.setTypeEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 3); + oper.data.process(input); + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", 1, exceptSink.count); + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple) + .entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(2), e + .getValue().doubleValue(), 0); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e + .getValue().doubleValue(), 0); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e + .getValue().doubleValue(), 0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java new file mode 100644 index 0000000..7682ae3 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link QuotientMap} + */ +@Deprecated +public class QuotientMapTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class); + + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new QuotientMap<String, Integer>()); + testNodeProcessingSchema(new QuotientMap<String, Double>()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void testNodeProcessingSchema(QuotientMap oper) throws Exception + { + CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink(); + + oper.quotient.setSink(quotientSink); + oper.setMult_by(2); + + oper.beginWindow(0); // + HashMap<String, Number> input = new HashMap<String, Number>(); + int numtuples = 100; + for (int i = 0; i < numtuples; i++) { + input.clear(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.numerator.process(input); + input.clear(); + input.put("a", 2); + input.put("b", 40); + input.put("c", 500); + oper.denominator.process(input); + } + + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", 1, quotientSink.count); + HashMap<String, Number> output = (HashMap<String, Number>)quotientSink.tuple; + for (Map.Entry<String, Number> e : output.entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", 2d, + e.getValue()); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", 1d, + e.getValue()); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", 4d, + e.getValue()); + } else { + LOG.debug(String.format("key was %s", e.getKey())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java new file mode 100644 index 0000000..4d1c7ff --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.api.Sink; + +/** + * @deprecated + * Functional tests for {@link Quotient} + */ +@Deprecated +public class QuotientTest +{ + + class TestSink implements Sink<Object> + { + List<Object> collectedTuples = new ArrayList<Object>(); + + @Override + public void put(Object payload) + { + collectedTuples.add(payload); + } + + @Override + public int getCount(boolean reset) + { + throw new UnsupportedOperationException("Not supported yet."); + } + } + + /** + * Test oper logic emits correct results. + */ + @Test + public void testNodeSchemaProcessing() + { + Quotient<Double> oper = new Quotient<Double>(); + TestSink quotientSink = new TestSink(); + oper.quotient.setSink(quotientSink); + + oper.setMult_by(2); + + oper.beginWindow(0); // + Double a = 30.0; + Double b = 20.0; + Double c = 100.0; + oper.denominator.process(a); + oper.denominator.process(b); + oper.denominator.process(c); + + a = 5.0; + oper.numerator.process(a); + a = 1.0; + oper.numerator.process(a); + b = 44.0; + oper.numerator.process(b); + + b = 10.0; + oper.numerator.process(b); + c = 22.0; + oper.numerator.process(c); + c = 18.0; + oper.numerator.process(c); + + a = 0.5; + oper.numerator.process(a); + b = 41.5; + oper.numerator.process(b); + a = 8.0; + oper.numerator.process(a); + oper.endWindow(); // + + // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" + Assert.assertEquals("number emitted tuples", 1, + quotientSink.collectedTuples.size()); + for (Object o : quotientSink.collectedTuples) { // sum is 1157 + Double val = (Double)o; + Assert.assertEquals("emitted quotient value was ", new Double(2.0), val); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java new file mode 100644 index 0000000..a50a3bd --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.math; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link SumCountMap}. + */ +@Deprecated +public class SumCountMapTest +{ + /** + * Test operator logic emits correct results. + */ + @Test + public void testNodeProcessing() + { + testNodeSchemaProcessing(true, true); + testNodeSchemaProcessing(true, false); + testNodeSchemaProcessing(false, true); + testNodeSchemaProcessing(false, false); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void testNodeSchemaProcessing(boolean sum, boolean count) + { + SumCountMap<String, Double> oper = new SumCountMap<String, Double>(); + oper.setType(Double.class); + CollectorTestSink sumSink = new CollectorTestSink(); + CollectorTestSink countSink = new CollectorTestSink(); + if (sum) { + oper.sum.setSink(sumSink); + } + if (count) { + oper.count.setSink(countSink); + } + + oper.beginWindow(0); // + + HashMap<String, Double> input = new HashMap<String, Double>(); + + input.put("a", 2.0); + input.put("b", 20.0); + input.put("c", 1000.0); + oper.data.process(input); + input.clear(); + input.put("a", 1.0); + oper.data.process(input); + input.clear(); + input.put("a", 10.0); + input.put("b", 5.0); + oper.data.process(input); + input.clear(); + input.put("d", 55.0); + input.put("b", 12.0); + oper.data.process(input); + input.clear(); + input.put("d", 22.0); + oper.data.process(input); + input.clear(); + input.put("d", 14.2); + oper.data.process(input); + input.clear(); + + // Mix integers and doubles + HashMap<String, Double> inputi = new HashMap<String, Double>(); + inputi.put("d", 46.0); + inputi.put("e", 2.0); + oper.data.process(inputi); + inputi.clear(); + inputi.put("a", 23.0); + inputi.put("d", 4.0); + oper.data.process(inputi); + inputi.clear(); + + oper.endWindow(); // + + if (sum) { + // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" + Assert.assertEquals("number emitted tuples", 1, sumSink.collectedTuples.size()); + + for (Object o : sumSink.collectedTuples) { + HashMap<String, Object> output = (HashMap<String, Object>)o; + for (Map.Entry<String, Object> e : output.entrySet()) { + Double val = (Double)e.getValue(); + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(36), + val); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(37), + val); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), + val); + } else if (e.getKey().equals("d")) { + Assert.assertEquals("emitted tuple for 'd' was ", + new Double(141.2), val); + } else if (e.getKey().equals("e")) { + Assert.assertEquals("emitted tuple for 'e' was ", new Double(2), + val); + } + } + } + } + if (count) { + // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" + Assert.assertEquals("number emitted tuples", 1, countSink.collectedTuples.size()); + for (Object o : countSink.collectedTuples) { + HashMap<String, Object> output = (HashMap<String, Object>)o; + for (Map.Entry<String, Object> e : output.entrySet()) { + Integer val = (Integer)e.getValue(); + if (e.getKey().equals("a")) { + Assert + .assertEquals("emitted value for 'a' was ", 4, val.intValue()); + } else if (e.getKey().equals("b")) { + Assert + .assertEquals("emitted tuple for 'b' was ", 3, val.intValue()); + } else if (e.getKey().equals("c")) { + Assert + .assertEquals("emitted tuple for 'c' was ", 1, val.intValue()); + } else if (e.getKey().equals("d")) { + Assert + .assertEquals("emitted tuple for 'd' was ", 5, val.intValue()); + } else if (e.getKey().equals("e")) { + Assert + .assertEquals("emitted tuple for 'e' was ", 1, val.intValue()); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java new file mode 100644 index 0000000..a535053 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.DeleteOperator}. + * @deprecated + */ +@Deprecated +public class DeleteOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + DeleteOperator oper = new DeleteOperator(); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java new file mode 100644 index 0000000..762d322 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; +@Deprecated +public class FullOuterJoinOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + OuterJoinOperator oper = new OuterJoinOperator(); + oper.setFullJoin(true); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable2Column(new ColumnIndex("c", null)); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 11); + tuple.put("c", 12); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class); + +}
