http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/AverageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java index 1973bec..712bcdc 100644 --- a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java +++ b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.math; import org.junit.Assert; import org.junit.Test; +import com.datatorrent.common.util.Pair; + import com.datatorrent.lib.testbench.CollectorTestSink; /** @@ -96,7 +98,7 @@ public class AverageTest Assert.assertEquals("number emitted tuples", 1, averageSink.collectedTuples.size()); for (Object o : averageSink.collectedTuples) { // count is 12 - Integer val = ((Number)o).intValue(); + Number val = ((Pair<? extends Number, Integer>)o).getFirst().intValue(); Assert.assertEquals("emitted average value was was ", new Integer(1157 / 12), val); } }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java deleted file mode 100644 index 7d7842e..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import org.junit.Assert; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyValPair; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertKeyVal}. - * <p> - * - */ -public class ChangeAlertKeyValTest -{ - private static Logger log = LoggerFactory - .getLogger(ChangeAlertKeyValTest.class); - - /** - * Test node logic emits correct results. - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new ChangeAlertKeyVal<String, Integer>()); - testNodeProcessingSchema(new ChangeAlertKeyVal<String, Double>()); - testNodeProcessingSchema(new ChangeAlertKeyVal<String, Float>()); - testNodeProcessingSchema(new ChangeAlertKeyVal<String, Short>()); - testNodeProcessingSchema(new ChangeAlertKeyVal<String, Long>()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public <V extends Number> void testNodeProcessingSchema( - ChangeAlertKeyVal<String, V> oper) - { - CollectorTestSink alertSink = new CollectorTestSink(); - - oper.alert.setSink(alertSink); - oper.setPercentThreshold(5); - - oper.beginWindow(0); - oper.data.process(new KeyValPair<String, V>("a", oper.getValue(200))); - oper.data.process(new KeyValPair<String, V>("b", oper.getValue(10))); - oper.data.process(new KeyValPair<String, V>("c", oper.getValue(100))); - - oper.data.process(new KeyValPair<String, V>("a", oper.getValue(203))); - oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12))); - oper.data.process(new KeyValPair<String, V>("c", oper.getValue(101))); - - oper.data.process(new KeyValPair<String, V>("a", oper.getValue(210))); - oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12))); - oper.data.process(new KeyValPair<String, V>("c", oper.getValue(102))); - - oper.data.process(new KeyValPair<String, V>("a", oper.getValue(231))); - oper.data.process(new KeyValPair<String, V>("b", oper.getValue(18))); - oper.data.process(new KeyValPair<String, V>("c", oper.getValue(103))); - oper.endWindow(); - - // One for a, Two for b - Assert.assertEquals("number emitted tuples", 3, - alertSink.collectedTuples.size()); - - double aval = 0; - double bval = 0; - log.debug("\nLogging tuples"); - for (Object o : alertSink.collectedTuples) { - KeyValPair<String, KeyValPair<Number, Double>> map = (KeyValPair<String, KeyValPair<Number, Double>>)o; - - log.debug(o.toString()); - if (map.getKey().equals("a")) { - KeyValPair<Number, Double> vmap = map.getValue(); - if (vmap != null) { - aval += vmap.getValue().doubleValue(); - } - } else { - KeyValPair<Number, Double> vmap = map.getValue(); - if (vmap != null) { - bval += vmap.getValue().doubleValue(); - } - } - } - Assert.assertEquals("change in a", 10.0, aval,0); - Assert.assertEquals("change in a", 70.0, bval,0); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java deleted file mode 100644 index 51f52f0..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertMap}. - * <p> - * - */ -public class ChangeAlertMapTest -{ - private static Logger log = LoggerFactory.getLogger(ChangeAlertMapTest.class); - - /** - * Test node logic emits correct results. - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new ChangeAlertMap<String, Integer>()); - testNodeProcessingSchema(new ChangeAlertMap<String, Double>()); - testNodeProcessingSchema(new ChangeAlertMap<String, Float>()); - testNodeProcessingSchema(new ChangeAlertMap<String, Short>()); - testNodeProcessingSchema(new ChangeAlertMap<String, Long>()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public <V extends Number> void testNodeProcessingSchema( - ChangeAlertMap<String, V> oper) - { - CollectorTestSink alertSink = new CollectorTestSink(); - - oper.alert.setSink(alertSink); - oper.setPercentThreshold(5); - - oper.beginWindow(0); - HashMap<String, V> input = new HashMap<String, V>(); - input.put("a", oper.getValue(200)); - input.put("b", oper.getValue(10)); - input.put("c", oper.getValue(100)); - oper.data.process(input); - - input.clear(); - input.put("a", oper.getValue(203)); - input.put("b", oper.getValue(12)); - input.put("c", oper.getValue(101)); - oper.data.process(input); - - input.clear(); - input.put("a", oper.getValue(210)); - input.put("b", oper.getValue(12)); - input.put("c", oper.getValue(102)); - oper.data.process(input); - - input.clear(); - input.put("a", oper.getValue(231)); - input.put("b", oper.getValue(18)); - input.put("c", oper.getValue(103)); - oper.data.process(input); - oper.endWindow(); - - // One for a, Two for b - Assert.assertEquals("number emitted tuples", 3, - alertSink.collectedTuples.size()); - - double aval = 0; - double bval = 0; - log.debug("\nLogging tuples"); - for (Object o : alertSink.collectedTuples) { - HashMap<String, HashMap<Number, Double>> map = (HashMap<String, HashMap<Number, Double>>)o; - Assert.assertEquals("map size", 1, map.size()); - log.debug(o.toString()); - HashMap<Number, Double> vmap = map.get("a"); - if (vmap != null) { - aval += vmap.get(231.0).doubleValue(); - } - vmap = map.get("b"); - if (vmap != null) { - if (vmap.get(12.0) != null) { - bval += vmap.get(12.0).doubleValue(); - } else { - bval += vmap.get(18.0).doubleValue(); - } - } - } - Assert.assertEquals("change in a", 10.0, aval,0); - Assert.assertEquals("change in a", 70.0, bval,0); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java deleted file mode 100644 index d127231..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyValPair; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.ChangeAlert}. <p> - * - */ -public class ChangeAlertTest -{ - private static Logger log = LoggerFactory.getLogger(ChangeAlertTest.class); - - /** - * Test node logic emits correct results. - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new ChangeAlert<Integer>()); - testNodeProcessingSchema(new ChangeAlert<Double>()); - testNodeProcessingSchema(new ChangeAlert<Float>()); - testNodeProcessingSchema(new ChangeAlert<Short>()); - testNodeProcessingSchema(new ChangeAlert<Long>()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public <V extends Number> void testNodeProcessingSchema(ChangeAlert<V> oper) - { - CollectorTestSink alertSink = new CollectorTestSink(); - - oper.alert.setSink(alertSink); - oper.setPercentThreshold(5); - - oper.beginWindow(0); - oper.data.process(oper.getValue(10)); - oper.data.process(oper.getValue(12)); // alert - oper.data.process(oper.getValue(12)); - oper.data.process(oper.getValue(18)); // alert - oper.data.process(oper.getValue(0)); // alert - oper.data.process(oper.getValue(20)); // this will not alert - oper.data.process(oper.getValue(30)); // alert - - oper.endWindow(); - - // One for a, Two for b - Assert.assertEquals("number emitted tuples", 4, alertSink.collectedTuples.size()); - - double aval = 0; - log.debug("\nLogging tuples"); - for (Object o: alertSink.collectedTuples) { - KeyValPair<Number, Double> map = (KeyValPair<Number, Double>)o; - log.debug(o.toString()); - aval += map.getValue().doubleValue(); - } - Assert.assertEquals("change in a", 220.0, aval,0); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java deleted file mode 100644 index 6c2151b..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyValPair; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.ChangeKeyVal}. - * <p> - * - */ -public class ChangeKeyValTest -{ - private static Logger log = LoggerFactory.getLogger(ChangeKeyValTest.class); - - /** - * Test node logic emits correct results. - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new ChangeKeyVal<String, Integer>()); - testNodeProcessingSchema(new ChangeKeyVal<String, Double>()); - testNodeProcessingSchema(new ChangeKeyVal<String, Float>()); - testNodeProcessingSchema(new ChangeKeyVal<String, Short>()); - testNodeProcessingSchema(new ChangeKeyVal<String, Long>()); - } - - /** - * - * @param oper - * key/value pair for comparison. - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public <V extends Number> void testNodeProcessingSchema( - ChangeKeyVal<String, V> oper) - { - CollectorTestSink changeSink = new CollectorTestSink(); - CollectorTestSink percentSink = new CollectorTestSink(); - - oper.change.setSink(changeSink); - oper.percent.setSink(percentSink); - - oper.beginWindow(0); - oper.base.process(new KeyValPair<String, V>("a", oper.getValue(2))); - oper.base.process(new KeyValPair<String, V>("b", oper.getValue(10))); - oper.base.process(new KeyValPair<String, V>("c", oper.getValue(100))); - - oper.data.process(new KeyValPair<String, V>("a", oper.getValue(3))); - oper.data.process(new KeyValPair<String, V>("b", oper.getValue(2))); - oper.data.process(new KeyValPair<String, V>("c", oper.getValue(4))); - - oper.endWindow(); - - // One for each key - Assert.assertEquals("number emitted tuples", 3, - changeSink.collectedTuples.size()); - Assert.assertEquals("number emitted tuples", 3, - percentSink.collectedTuples.size()); - - log.debug("\nLogging tuples"); - for (Object o : changeSink.collectedTuples) { - KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o; - if (kv.getKey().equals("a")) { - Assert.assertEquals("change in a ", 1.0, kv.getValue()); - } - if (kv.getKey().equals("b")) { - Assert.assertEquals("change in b ", -8.0, kv.getValue()); - } - if (kv.getKey().equals("c")) { - Assert.assertEquals("change in c ", -96.0, kv.getValue()); - } - } - - for (Object o : percentSink.collectedTuples) { - KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o; - if (kv.getKey().equals("a")) { - Assert.assertEquals("change in a ", 50.0, kv.getValue()); - } - if (kv.getKey().equals("b")) { - Assert.assertEquals("change in b ", -80.0, kv.getValue()); - } - if (kv.getKey().equals("c")) { - Assert.assertEquals("change in c ", -96.0, kv.getValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java deleted file mode 100644 index 9595a5d..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.Change}. - * <p> - * - */ -public class ChangeTest -{ - private static Logger log = LoggerFactory.getLogger(ChangeTest.class); - - /** - * Test node logic emits correct results. - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new Change<Integer>()); - testNodeProcessingSchema(new Change<Double>()); - testNodeProcessingSchema(new Change<Float>()); - testNodeProcessingSchema(new Change<Short>()); - testNodeProcessingSchema(new Change<Long>()); - } - - /** - * - * @param oper Data value for comparison. - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public <V extends Number> void testNodeProcessingSchema(Change<V> oper) - { - CollectorTestSink changeSink = new CollectorTestSink(); - CollectorTestSink percentSink = new CollectorTestSink(); - - oper.change.setSink(changeSink); - oper.percent.setSink(percentSink); - - oper.beginWindow(0); - oper.base.process(oper.getValue(10)); - oper.data.process(oper.getValue(5)); - oper.data.process(oper.getValue(15)); - oper.data.process(oper.getValue(20)); - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", 3, - changeSink.collectedTuples.size()); - Assert.assertEquals("number emitted tuples", 3, - percentSink.collectedTuples.size()); - - log.debug("\nLogging tuples"); - for (Object o : changeSink.collectedTuples) { - log.debug(String.format("change %s", o)); - } - for (Object o : percentSink.collectedTuples) { - log.debug(String.format("percent change %s", o)); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java b/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java deleted file mode 100644 index 46b9609..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.CompareExceptMap}<p> - * - */ -public class CompareExceptMapTest -{ - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new CompareExceptMap<String, Integer>()); - testNodeProcessingSchema(new CompareExceptMap<String, Double>()); - testNodeProcessingSchema(new CompareExceptMap<String, Float>()); - testNodeProcessingSchema(new CompareExceptMap<String, Short>()); - testNodeProcessingSchema(new CompareExceptMap<String, Long>()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testNodeProcessingSchema(CompareExceptMap oper) - { - CountAndLastTupleTestSink compareSink = new CountAndLastTupleTestSink(); - CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink(); - oper.compare.setSink(compareSink); - oper.except.setSink(exceptSink); - - oper.setKey("a"); - oper.setValue(3.0); - oper.setTypeEQ(); - - oper.beginWindow(0); - HashMap<String, Number> input = new HashMap<String, Number>(); - input.put("a", 2); - input.put("b", 20); - input.put("c", 1000); - oper.data.process(input); - input.clear(); - input.put("a", 3); - input.put("b", 21); - input.put("c", 30); - oper.data.process(input); - oper.endWindow(); - - // One for each key - Assert.assertEquals("number emitted tuples", 1, exceptSink.count); - for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple).entrySet()) { - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0); - } - } - - Assert.assertEquals("number emitted tuples", 1, compareSink.count); - for (Map.Entry<String, Number> e : ((HashMap<String, Number>)compareSink.tuple).entrySet()) { - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", new Double(3), e.getValue().doubleValue(), 0); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", new Double(21), e.getValue().doubleValue(), 0); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", new Double(30), e.getValue().doubleValue(), 0); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java b/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java deleted file mode 100644 index c21019b..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.CompareMap}<p> - * - */ -public class CompareMapTest -{ - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new CompareMap<String, Integer>()); - testNodeProcessingSchema(new CompareMap<String, Double>()); - testNodeProcessingSchema(new CompareMap<String, Float>()); - testNodeProcessingSchema(new CompareMap<String, Short>()); - testNodeProcessingSchema(new CompareMap<String, Long>()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testNodeProcessingSchema(CompareMap oper) - { - CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); - oper.compare.setSink(matchSink); - oper.setKey("a"); - oper.setValue(3.0); - oper.setTypeNEQ(); - - oper.beginWindow(0); - HashMap<String, Number> input = new HashMap<String, Number>(); - - input.put("a", 2); - input.put("b", 20); - input.put("c", 1000); - oper.data.process(input); - input.clear(); - input.put("a", 3); - oper.data.process(input); - oper.endWindow(); - - // One for each key - Assert.assertEquals("number emitted tuples", 1, matchSink.count); - for (Map.Entry<String, Number> e : ((HashMap<String, Number>)matchSink.tuple).entrySet()) { - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java deleted file mode 100644 index 0b1d9de..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyValPair; - -/** - * - * Functional tests for {@link com.datatorrent.lib.math.CountKeyVal}. <p> - * - */ -public class CountKeyValTest -{ - /** - * Test operator logic emits correct results. - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testNodeProcessing() - { - CountKeyVal<String, Double> oper = new CountKeyVal<String, Double>(); - CollectorTestSink countSink = new CollectorTestSink(); - oper.count.setSink(countSink); - - oper.beginWindow(0); // - - oper.data.process(new KeyValPair("a", 2.0)); - oper.data.process(new KeyValPair("b", 20.0)); - oper.data.process(new KeyValPair("c", 1000.0)); - oper.data.process(new KeyValPair("a", 1.0)); - oper.data.process(new KeyValPair("a", 10.0)); - oper.data.process(new KeyValPair("b", 5.0)); - oper.data.process(new KeyValPair("d", 55.0)); - oper.data.process(new KeyValPair("b", 12.0)); - oper.data.process(new KeyValPair("d", 22.0)); - oper.data.process(new KeyValPair("d", 14.2)); - oper.data.process(new KeyValPair("d", 46.0)); - oper.data.process(new KeyValPair("e", 2.0)); - oper.data.process(new KeyValPair("a", 23.0)); - oper.data.process(new KeyValPair("d", 4.0)); - - oper.endWindow(); // - - // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" - Assert.assertEquals("number emitted tuples", 5, countSink.collectedTuples.size()); - for (Object o : countSink.collectedTuples) { - KeyValPair<String, Integer> e = (KeyValPair<String, Integer>)o; - Integer val = (Integer)e.getValue(); - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", 4, val.intValue()); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", 3, val.intValue()); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", 1, val.intValue()); - } else if (e.getKey().equals("d")) { - Assert.assertEquals("emitted tuple for 'd' was ", 5, val.intValue()); - } else if (e.getKey().equals("e")) { - Assert.assertEquals("emitted tuple for 'e' was ", 1, val.intValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java b/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java index 9fe556f..c7e7c65 100644 --- a/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java +++ b/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java @@ -29,6 +29,7 @@ import com.datatorrent.lib.testbench.CollectorTestSink; * <p> * */ + public class DivisionTest { /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java b/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java deleted file mode 100644 index a113d5f..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; - -/** - * Functional tests for {@link com.datatorrent.lib.math.ExceptMap} - */ -public class ExceptMapTest -{ - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new ExceptMap<String, Integer>()); - testNodeProcessingSchema(new ExceptMap<String, Double>()); - testNodeProcessingSchema(new ExceptMap<String, Float>()); - testNodeProcessingSchema(new ExceptMap<String, Short>()); - testNodeProcessingSchema(new ExceptMap<String, Long>()); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testNodeProcessingSchema(ExceptMap oper) - { - CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink(); - oper.except.setSink(exceptSink); - oper.setKey("a"); - oper.setValue(3.0); - oper.setTypeEQ(); - - oper.beginWindow(0); - HashMap<String, Number> input = new HashMap<String, Number>(); - input.put("a", 2); - input.put("b", 20); - input.put("c", 1000); - oper.data.process(input); - input.clear(); - input.put("a", 3); - oper.data.process(input); - oper.endWindow(); - - // One for each key - Assert.assertEquals("number emitted tuples", 1, exceptSink.count); - for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple) - .entrySet()) { - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", new Double(2), e - .getValue().doubleValue(), 0); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e - .getValue().doubleValue(), 0); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e - .getValue().doubleValue(), 0); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java index 68e89eb..4eb6a1b 100644 --- a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java +++ b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java @@ -24,8 +24,10 @@ import org.junit.Test; import com.datatorrent.lib.testbench.SumTestSink; /** + * * Functional tests for {@link com.datatorrent.lib.math.MultiplyByConstant} */ + public class MultiplyByConstantTest { /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java deleted file mode 100644 index 92c0e77..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; - -/** - * Functional tests for {@link com.datatorrent.lib.math.QuotientMap} - */ -public class QuotientMapTest -{ - private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class); - - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new QuotientMap<String, Integer>()); - testNodeProcessingSchema(new QuotientMap<String, Double>()); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testNodeProcessingSchema(QuotientMap oper) throws Exception - { - CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink(); - - oper.quotient.setSink(quotientSink); - oper.setMult_by(2); - - oper.beginWindow(0); // - HashMap<String, Number> input = new HashMap<String, Number>(); - int numtuples = 100; - for (int i = 0; i < numtuples; i++) { - input.clear(); - input.put("a", 2); - input.put("b", 20); - input.put("c", 1000); - oper.numerator.process(input); - input.clear(); - input.put("a", 2); - input.put("b", 40); - input.put("c", 500); - oper.denominator.process(input); - } - - oper.endWindow(); - - // One for each key - Assert.assertEquals("number emitted tuples", 1, quotientSink.count); - HashMap<String, Number> output = (HashMap<String, Number>)quotientSink.tuple; - for (Map.Entry<String, Number> e : output.entrySet()) { - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", 2d, - e.getValue()); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", 1d, - e.getValue()); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", 4d, - e.getValue()); - } else { - LOG.debug(String.format("key was %s", e.getKey())); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java deleted file mode 100644 index 604e45f..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.api.Sink; - -/** - * Functional tests for {@link com.datatorrent.lib.math.Quotient} - */ -public class QuotientTest -{ - - class TestSink implements Sink<Object> - { - List<Object> collectedTuples = new ArrayList<Object>(); - - @Override - public void put(Object payload) - { - collectedTuples.add(payload); - } - - @Override - public int getCount(boolean reset) - { - throw new UnsupportedOperationException("Not supported yet."); - } - } - - /** - * Test oper logic emits correct results. - */ - @Test - public void testNodeSchemaProcessing() - { - Quotient<Double> oper = new Quotient<Double>(); - TestSink quotientSink = new TestSink(); - oper.quotient.setSink(quotientSink); - - oper.setMult_by(2); - - oper.beginWindow(0); // - Double a = 30.0; - Double b = 20.0; - Double c = 100.0; - oper.denominator.process(a); - oper.denominator.process(b); - oper.denominator.process(c); - - a = 5.0; - oper.numerator.process(a); - a = 1.0; - oper.numerator.process(a); - b = 44.0; - oper.numerator.process(b); - - b = 10.0; - oper.numerator.process(b); - c = 22.0; - oper.numerator.process(c); - c = 18.0; - oper.numerator.process(c); - - a = 0.5; - oper.numerator.process(a); - b = 41.5; - oper.numerator.process(b); - a = 8.0; - oper.numerator.process(a); - oper.endWindow(); // - - // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" - Assert.assertEquals("number emitted tuples", 1, - quotientSink.collectedTuples.size()); - for (Object o : quotientSink.collectedTuples) { // sum is 1157 - Double val = (Double)o; - Assert.assertEquals("emitted quotient value was ", new Double(2.0), val); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java index e968dba..f74e0c9 100644 --- a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java +++ b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java @@ -26,11 +26,12 @@ import org.junit.Test; import com.datatorrent.lib.testbench.SumTestSink; /** - * + * * Functional tests for {@link com.datatorrent.lib.math.Sigma} * <p> * */ + public class SigmaTest { /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java b/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java deleted file mode 100644 index b0c7b38..0000000 --- a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional tests for {@link com.datatorrent.lib.math.SumCountMap}. - */ -public class SumCountMapTest -{ - /** - * Test operator logic emits correct results. - */ - @Test - public void testNodeProcessing() - { - testNodeSchemaProcessing(true, true); - testNodeSchemaProcessing(true, false); - testNodeSchemaProcessing(false, true); - testNodeSchemaProcessing(false, false); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testNodeSchemaProcessing(boolean sum, boolean count) - { - SumCountMap<String, Double> oper = new SumCountMap<String, Double>(); - oper.setType(Double.class); - CollectorTestSink sumSink = new CollectorTestSink(); - CollectorTestSink countSink = new CollectorTestSink(); - if (sum) { - oper.sum.setSink(sumSink); - } - if (count) { - oper.count.setSink(countSink); - } - - oper.beginWindow(0); // - - HashMap<String, Double> input = new HashMap<String, Double>(); - - input.put("a", 2.0); - input.put("b", 20.0); - input.put("c", 1000.0); - oper.data.process(input); - input.clear(); - input.put("a", 1.0); - oper.data.process(input); - input.clear(); - input.put("a", 10.0); - input.put("b", 5.0); - oper.data.process(input); - input.clear(); - input.put("d", 55.0); - input.put("b", 12.0); - oper.data.process(input); - input.clear(); - input.put("d", 22.0); - oper.data.process(input); - input.clear(); - input.put("d", 14.2); - oper.data.process(input); - input.clear(); - - // Mix integers and doubles - HashMap<String, Double> inputi = new HashMap<String, Double>(); - inputi.put("d", 46.0); - inputi.put("e", 2.0); - oper.data.process(inputi); - inputi.clear(); - inputi.put("a", 23.0); - inputi.put("d", 4.0); - oper.data.process(inputi); - inputi.clear(); - - oper.endWindow(); // - - if (sum) { - // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" - Assert.assertEquals("number emitted tuples", 1, sumSink.collectedTuples.size()); - - for (Object o : sumSink.collectedTuples) { - HashMap<String, Object> output = (HashMap<String, Object>)o; - for (Map.Entry<String, Object> e : output.entrySet()) { - Double val = (Double)e.getValue(); - if (e.getKey().equals("a")) { - Assert.assertEquals("emitted value for 'a' was ", new Double(36), - val); - } else if (e.getKey().equals("b")) { - Assert.assertEquals("emitted tuple for 'b' was ", new Double(37), - val); - } else if (e.getKey().equals("c")) { - Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), - val); - } else if (e.getKey().equals("d")) { - Assert.assertEquals("emitted tuple for 'd' was ", - new Double(141.2), val); - } else if (e.getKey().equals("e")) { - Assert.assertEquals("emitted tuple for 'e' was ", new Double(2), - val); - } - } - } - } - if (count) { - // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e" - Assert.assertEquals("number emitted tuples", 1, countSink.collectedTuples.size()); - for (Object o : countSink.collectedTuples) { - HashMap<String, Object> output = (HashMap<String, Object>)o; - for (Map.Entry<String, Object> e : output.entrySet()) { - Integer val = (Integer)e.getValue(); - if (e.getKey().equals("a")) { - Assert - .assertEquals("emitted value for 'a' was ", 4, val.intValue()); - } else if (e.getKey().equals("b")) { - Assert - .assertEquals("emitted tuple for 'b' was ", 3, val.intValue()); - } else if (e.getKey().equals("c")) { - Assert - .assertEquals("emitted tuple for 'c' was ", 1, val.intValue()); - } else if (e.getKey().equals("d")) { - Assert - .assertEquals("emitted tuple for 'd' was ", 5, val.intValue()); - } else if (e.getKey().equals("e")) { - Assert - .assertEquals("emitted tuple for 'e' was ", 1, val.intValue()); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java deleted file mode 100644 index 1f29d1d..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.EqualValueCondition; -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional test for {@link com.datatorrent.lib.streamquery.DeleteOperator}. - */ -public class DeleteOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - DeleteOperator oper = new DeleteOperator(); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java deleted file mode 100644 index 728fb96..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -public class FullOuterJoinOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - OuterJoinOperator oper = new OuterJoinOperator(); - oper.setFullJoin(true); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - // set column join condition - Condition cond = new JoinColumnEqualCondition("a", "a"); - oper.setJoinCondition(cond); - - // add columns - oper.selectTable1Column(new ColumnIndex("b", null)); - oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 11); - tuple.put("c", 12); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java deleted file mode 100644 index 714f93b..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.EqualValueCondition; -import com.datatorrent.lib.streamquery.function.SumFunction; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional test for {@link com.datatorrent.lib.streamquery.GroupByOperatorTest}. - */ -public class GroupByOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlGroupBy() - { - // create operator - GroupByHavingOperator oper = new GroupByHavingOperator(); - oper.addColumnGroupByIndex(new ColumnIndex("b", null)); - try { - oper.addAggregateIndex(new SumFunction("c", null)); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - return; - } - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 2); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 2); - tuple.put("c", 7); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java deleted file mode 100644 index e11723d..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.EqualValueCondition; -import com.datatorrent.lib.streamquery.condition.HavingCompareValue; -import com.datatorrent.lib.streamquery.condition.HavingCondition; -import com.datatorrent.lib.streamquery.function.FunctionIndex; -import com.datatorrent.lib.streamquery.function.SumFunction; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional test for {@link com.datatorrent.lib.streamquery.HavingOperatorTest}. - */ -public class HavingOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlGroupBy() throws Exception - { - // create operator - GroupByHavingOperator oper = new GroupByHavingOperator(); - oper.addColumnGroupByIndex(new ColumnIndex("b", null)); - FunctionIndex sum = new SumFunction("c", null); - oper.addAggregateIndex(sum); - - // create having condition - HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0); - oper.addHavingCondition(having); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 2); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 2); - tuple.put("c", 7); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java deleted file mode 100644 index 8a022ee..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * - * Functional test for {@link com.datatorrent.lib.streamquery.InnerJoinOperator }. - * - */ -public class InnerJoinOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - InnerJoinOperator oper = new InnerJoinOperator(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - // set column join condition - Condition cond = new JoinColumnEqualCondition("a", "a"); - oper.setJoinCondition(cond); - - // add columns - oper.selectTable1Column(new ColumnIndex("b", null)); - oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java deleted file mode 100644 index aa25e87..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -public class LeftOuterJoinOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - OuterJoinOperator oper = new OuterJoinOperator(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - // set column join condition - Condition cond = new JoinColumnEqualCondition("a", "a"); - oper.setJoinCondition(cond); - - // add columns - oper.selectTable1Column(new ColumnIndex("b", null)); - oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 11); - tuple.put("c", 12); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java deleted file mode 100644 index 2d7ba87..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional test for {@link com.datatorrent.lib.streamquery.OrderByOperatorTest}. - */ -public class OrderByOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // craete operator - OrderByOperator oper = new OrderByOperator(); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - oper.addOrderByRule(new OrderByRule<Integer>("b")); - oper.setDescending(true); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("c", 2); - tuple.put("a", 0); - tuple.put("b", 1); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 6); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 4); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 8); - tuple.put("c", 4); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java deleted file mode 100644 index 3a57427..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -public class RightOuterJoinOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - OuterJoinOperator oper = new OuterJoinOperator(); - oper.setRighttJoin(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - // set column join condition - Condition cond = new JoinColumnEqualCondition("a", "a"); - oper.setJoinCondition(cond); - - // add columns - oper.selectTable1Column(new ColumnIndex("b", null)); - oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 11); - tuple.put("c", 12); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java deleted file mode 100644 index 8e6620e..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.EqualValueCondition; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional test for {@link com.datatorrent.lib.streamquery.SelectOperatorTest}. - */ -public class SelectOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - SelectOperator oper = new SelectOperator(); - oper.addIndex(new ColumnIndex("b", null)); - oper.addIndex(new ColumnIndex("c", null)); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java deleted file mode 100644 index c92c6c1..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.testbench.CollectorTestSink; - -public class SelectTopOperatorTest -{ - @SuppressWarnings({"rawtypes", "unchecked"}) - @Test - public void testOperator() throws Exception - { - SelectTopOperator oper = new SelectTopOperator(); - oper.setTopValue(2); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.beginWindow(1); - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - oper.endWindow(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java deleted file mode 100644 index 42af56b..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.condition.EqualValueCondition; -import com.datatorrent.lib.testbench.CollectorTestSink; - -public class UpdateOperatorTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - UpdateOperator oper = new UpdateOperator(); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - oper.addUpdate("c", 100); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java deleted file mode 100644 index b0500eb..0000000 --- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.advanced; - -import java.util.HashMap; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.streamquery.SelectOperator; -import com.datatorrent.lib.streamquery.condition.BetweenCondition; -import com.datatorrent.lib.streamquery.index.ColumnIndex; -import com.datatorrent.lib.testbench.CollectorTestSink; - -/** - * Functional test for {@link com.datatorrent.lib.streamquery.advanced.BetweenConditionTest}. - */ -public class BetweenConditionTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSqlSelect() - { - // create operator - SelectOperator oper = new SelectOperator(); - oper.addIndex(new ColumnIndex("b", null)); - oper.addIndex(new ColumnIndex("c", null)); - - BetweenCondition cond = new BetweenCondition("a", 0, 2); - oper.setCondition(cond); - - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 3); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - LOG.debug("{}", sink.collectedTuples); - } - - private static final Logger LOG = LoggerFactory.getLogger(BetweenConditionTest.class); -}
