http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java new file mode 100644 index 0000000..6ad818e --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; +import org.apache.apex.malhar.contrib.misc.streamquery.function.SumFunction; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.GroupByOperatorTest}. + * @deprecated + */ +@Deprecated +public class GroupByOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlGroupBy() + { + // create operator + GroupByHavingOperator oper = new GroupByHavingOperator(); + oper.addColumnGroupByIndex(new ColumnIndex("b", null)); + try { + oper.addAggregateIndex(new SumFunction("c", null)); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return; + } + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 2); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 2); + tuple.put("c", 7); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class); + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java new file mode 100644 index 0000000..5b696f1 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCompareValue; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCondition; +import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex; +import org.apache.apex.malhar.contrib.misc.streamquery.function.SumFunction; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.HavingOperatorTest}. + * @deprecated + */ +@Deprecated +public class HavingOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlGroupBy() throws Exception + { + // create operator + GroupByHavingOperator oper = new GroupByHavingOperator(); + oper.addColumnGroupByIndex(new ColumnIndex("b", null)); + FunctionIndex sum = new SumFunction("c", null); + oper.addAggregateIndex(sum); + + // create having condition + HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0); + oper.addHavingCondition(having); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 2); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 2); + tuple.put("c", 7); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java new file mode 100644 index 0000000..8b4f923 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * + * Functional test for {@link com.datatorrent.lib.streamquery.InnerJoinOperator }. + * @deprecated + */ +@Deprecated +public class InnerJoinOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + InnerJoinOperator oper = new InnerJoinOperator(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable2Column(new ColumnIndex("c", null)); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java new file mode 100644 index 0000000..f78ba21 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +@Deprecated +public class LeftOuterJoinOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + OuterJoinOperator oper = new OuterJoinOperator(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable2Column(new ColumnIndex("c", null)); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 11); + tuple.put("c", 12); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java new file mode 100644 index 0000000..88aa2d0 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link com.datatorrent.lib.streamquery.OrderByOperatorTest}. + * @deprecated + */ +@Deprecated +public class OrderByOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // craete operator + OrderByOperator oper = new OrderByOperator(); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + oper.addOrderByRule(new OrderByRule<Integer>("b")); + oper.setDescending(true); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("c", 2); + tuple.put("a", 0); + tuple.put("b", 1); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 6); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 4); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 8); + tuple.put("c", 4); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java new file mode 100644 index 0000000..8142276 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +@Deprecated +public class RightOuterJoinOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + OuterJoinOperator oper = new OuterJoinOperator(); + oper.setRighttJoin(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable2Column(new ColumnIndex("c", null)); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 11); + tuple.put("c", 12); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java new file mode 100644 index 0000000..c5acbdd --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class SelectOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new ColumnIndex("b", null)); + oper.addIndex(new ColumnIndex("c", null)); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java new file mode 100644 index 0000000..90480cf --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +@Deprecated +public class SelectTopOperatorTest +{ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testOperator() throws Exception + { + SelectTopOperator oper = new SelectTopOperator(); + oper.setTopValue(2); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.beginWindow(1); + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + oper.endWindow(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java new file mode 100644 index 0000000..f480cc7 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +@Deprecated +public class UpdateOperatorTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + UpdateOperator oper = new UpdateOperator(); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + oper.addUpdate("c", 100); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java new file mode 100644 index 0000000..01465db --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.BetweenCondition; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.advanced.BetweenConditionTest}. + * @deprecated + */ +@Deprecated +public class BetweenConditionTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new ColumnIndex("b", null)); + oper.addIndex(new ColumnIndex("c", null)); + + BetweenCondition cond = new BetweenCondition("a", 0, 2); + oper.setCondition(cond); + + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 3); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(BetweenConditionTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java new file mode 100644 index 0000000..e160e5d --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.CompoundCondition; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.advanced.CompoundConditionTest}. + * @deprecated + */ +@Deprecated +public class CompoundConditionTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new ColumnIndex("b", null)); + oper.addIndex(new ColumnIndex("c", null)); + + EqualValueCondition left = new EqualValueCondition(); + left.addEqualValue("a", 1); + EqualValueCondition right = new EqualValueCondition(); + right.addEqualValue("b", 1); + + oper.setCondition(new CompoundCondition(left, right)); + + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 3); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(CompoundConditionTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java new file mode 100644 index 0000000..d641a1c --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.InCondition; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.advanced.InConditionTest}. + * @deprecated + */ +@Deprecated +public class InConditionTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new ColumnIndex("b", null)); + oper.addIndex(new ColumnIndex("c", null)); + + InCondition cond = new InCondition("a"); + cond.addInValue(0); + cond.addInValue(1); + oper.setCondition(cond); + + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 3); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(InConditionTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java new file mode 100644 index 0000000..e09d47a --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.LikeCondition; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.advanced.LikeConditionTest}. + * @deprecated + */ +@Deprecated +public class LikeConditionTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new ColumnIndex("b", null)); + oper.addIndex(new ColumnIndex("c", null)); + + LikeCondition condition = new LikeCondition("a", "test*"); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", "testing"); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", "null"); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", "testall"); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(LikeConditionTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java new file mode 100644 index 0000000..745c8c9 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.index.NegateExpression; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class NegateIndexTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new NegateExpression("b", null)); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(NegateIndexTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java new file mode 100644 index 0000000..c1fa8b7 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.function.AverageFunction; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class SelectAverageTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectFunctionOperator oper = new SelectFunctionOperator(); + oper.addSqlFunction(new AverageFunction("b", null)); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SelectAverageTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java new file mode 100644 index 0000000..8ac1e33 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.function.CountFunction; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class SelectCountTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectFunctionOperator oper = new SelectFunctionOperator(); + oper.addSqlFunction(new CountFunction("b", null)); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", null); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", null); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SelectCountTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java new file mode 100644 index 0000000..17341da --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.function.FirstLastFunction; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class SelectFirstLastTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectFunctionOperator oper = new SelectFunctionOperator(); + oper.addSqlFunction(new FirstLastFunction("b", null, false)); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", null); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", null); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SelectFirstLastTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java new file mode 100644 index 0000000..fa347c1 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.function.MaxMinFunction; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class SelectMaxMinTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectFunctionOperator oper = new SelectFunctionOperator(); + oper.addSqlFunction(new MaxMinFunction("b", null, false)); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SelectMaxMinTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java new file mode 100644 index 0000000..0d374dc --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.advanced; + +import java.util.HashMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.index.SumExpression; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Functional test for {@link org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}. + * @deprecated + */ +@Deprecated +public class SumIndexTest +{ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSqlSelect() + { + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new SumExpression("b", "c", null)); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); + } + + private static final Logger LOG = LoggerFactory.getLogger(SumIndexTest.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/demos/yahoofinance/pom.xml ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml index 12e5eb0..d92cf2a 100644 --- a/demos/yahoofinance/pom.xml +++ b/demos/yahoofinance/pom.xml @@ -49,6 +49,17 @@ <artifactId>derby</artifactId> <version>10.9.1.0</version> </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java index 50b306d..1a38495 100644 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java +++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java @@ -18,14 +18,14 @@ */ package com.datatorrent.demos.yahoofinance; +import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.DerbySqlStreamOperator; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator; -import com.datatorrent.lib.streamquery.DerbySqlStreamOperator; /** * This demo will output the stock market data from yahoo finance http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java b/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java deleted file mode 100644 index 252ea2f..0000000 --- a/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.algo; - -import java.util.Iterator; -import java.util.List; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang3.mutable.MutableInt; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.common.util.BaseOperator; - -/** - * <p> - * This operator searches for a given pattern in the input stream.<br> - * For e.g. If the pattern is defined as âaaâ and your input events arrive in following manner âaâ, âaâ, âaâ, then this operator - * will emit 2 matches for the given pattern. One matching event 1 and 2 and other matching 2 and 3. - * </p> - * - * <br> - * <b> StateFull : Yes, </b> Pattern is found over application window(s). <br> - * <b> Partitionable : No, </b> will yield wrong result. <br> - * - * <br> - * <b>Ports</b>:<br> - * <b>inputPort</b>: the port to receive input<br> - * - * <br> - * <b>Properties</b>:<br> - * <b>pattern</b>: The pattern that needs to be searched<br> - * - * @param <T> event type - * - * @since 2.0.0 - */ - -@OperatorAnnotation(partitionable = false) -public abstract class AbstractStreamPatternMatcher<T> extends BaseOperator -{ - /** - * The pattern to be searched in the input stream of events - */ - @NotNull - private Pattern<T> pattern; - - // this stores the index of the partial matches found so far - private List<MutableInt> partialMatches = Lists.newLinkedList(); - private transient MutableInt patternLength; - - /** - * Set the pattern that needs to be searched in the input stream of events - * - * @param pattern The pattern to be searched - */ - public void setPattern(Pattern<T> pattern) - { - this.pattern = pattern; - partialMatches.clear(); - patternLength = new MutableInt(pattern.getStates().length - 1); - } - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - patternLength = new MutableInt(pattern.getStates().length - 1); - } - - /** - * Get the pattern that is searched in the input stream of events - * - * @return Returns the pattern searched - */ - public Pattern<T> getPattern() - { - return pattern; - } - - public transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() - { - @Override - public void process(T t) - { - if (pattern.checkState(t, 0)) { - partialMatches.add(new MutableInt(-1)); - } - if (partialMatches.size() > 0) { - MutableInt tempInt; - Iterator<MutableInt> itr = partialMatches.iterator(); - while (itr.hasNext()) { - tempInt = itr.next(); - tempInt.increment(); - if (!pattern.checkState(t, tempInt.intValue())) { - itr.remove(); - } else if (tempInt.equals(patternLength)) { - itr.remove(); - processPatternFound(); - } - } - } - } - }; - - /** - * This function determines how to process the pattern found - */ - public abstract void processPatternFound(); - - public static class Pattern<T> - { - /** - * The states of the pattern - */ - @NotNull - private final T[] states; - - //for kryo - private Pattern() - { - states = null; - } - - public Pattern(@NotNull T[] states) - { - this.states = states; - } - - /** - * Checks if the input state matches the state at index "index" of the pattern - * - * @param t The input state - * @param index The index to match in the pattern - * @return True if the state exists at index "index" else false - */ - public boolean checkState(T t, int index) - { - return states[index].equals(t); - } - - /** - * Get the states of the pattern - * - * @return The states of the pattern - */ - public T[] getStates() - { - return states; - } - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java b/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java deleted file mode 100644 index 82d2201..0000000 --- a/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.algo; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; - -import com.datatorrent.lib.util.BaseMatchOperator; - -/** - * This operator takes Maps, whose values are numbers, as input tuples. - * It then performs a numeric comparison on the values corresponding to one of the keys in the input tuple maps. - * All tuples processed by the operator before the first successful comparison are not output by the operator, - * all tuples processed by the operator after and including a successful comparison are output by the operator. - * - * <p> - * A compare metric is done on input tuple based on the property "key", - * "value", and "cmp" type. All tuples are emitted (inclusive) once a match is made. - * The comparison is done by getting double value from the Number. - * This module is a pass through<br> - * <br> - * <b> StateFull : Yes, </b> Count is aggregated over application window(s). <br> - * <b> Partitions : No, </b> will yield wrong result. <br> - * <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects Map<K,V extends Number><br> - * <b>allafter</b>: emits Map<K,V extends Number> if compare function - * returns true<br> - * <br> - * <b>Properties</b>:<br> - * <b>key</b>: The key on which compare is done<br> - * <b>value</b>: The value to compare with<br> - * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", - * "neq", "gt", "gte". Default is "eq"<br> - * <br> - * <b>Specific compile time checks</b>:<br> - * Key must be non empty<br> - * Value must be able to convert to a "double"<br> - * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", - * "gte"<br> - * <b>Specific run time checks</b>: None<br> - * <br> - * </p> - * - * @displayName Emit All After Match (Number) - * @category Rules and Alerts - * @tags filter, compare, numeric, key value - * - * @since 0.3.2 - */ -@OperatorAnnotation(partitionable = false) -public class AllAfterMatchMap<K, V extends Number> extends - BaseMatchOperator<K, V> -{ - /** - * The input port on which tuples are received. - */ - public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() - { - /** - * Process HashMap<K,V> and emit all tuples at and after match - */ - @Override - public void process(Map<K, V> tuple) - { - if (doemit) { - allafter.emit(cloneTuple(tuple)); - return; - } - V v = tuple.get(getKey()); - if (v == null) { // error tuple - return; - } - if (compareValue(v.doubleValue())) { - doemit = true; - allafter.emit(cloneTuple(tuple)); - } - } - }; - - /** - * The output port on which all tuples after a match are emitted. - */ - public final transient DefaultOutputPort<HashMap<K, V>> allafter = new DefaultOutputPort<HashMap<K, V>>(); - boolean doemit = false; - - /** - * Resets the matched variable - * - * @param windowId - */ - @Override - public void beginWindow(long windowId) - { - doemit = false; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java b/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java index ba5e5a1..1989f24 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java +++ b/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java @@ -61,8 +61,9 @@ import com.datatorrent.lib.util.UnifierSumNumber; * @tags count, key value * * @since 0.3.2 + * @deprecated */ - +@Deprecated @OperatorAnnotation(partitionable = true) public class CompareExceptCountMap<K, V extends Number> extends MatchMap<K, V> { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/Distinct.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/Distinct.java b/library/src/main/java/com/datatorrent/lib/algo/Distinct.java index c694bd2..ce29c50 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/Distinct.java +++ b/library/src/main/java/com/datatorrent/lib/algo/Distinct.java @@ -76,10 +76,7 @@ public class Distinct<K> extends BaseKeyOperator<K> implements Unifier<K> @Override public void process(K tuple) { - if (!map.containsKey(tuple)) { - distinct.emit(cloneKey(tuple)); - map.put(cloneKey(tuple), null); - } + Distinct.this.process(tuple); } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java b/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java deleted file mode 100644 index e3d658b..0000000 --- a/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.algo; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; - -import com.datatorrent.lib.util.BaseKeyValueOperator; -import com.datatorrent.lib.util.UnifierHashMap; - -/** - * This operator computes and emits distinct key,val pairs (i.e drops duplicates). - * <p> - * Computes and emits distinct key,val pairs (i.e drops duplicates) - * </p> - * <p> - * This is a pass through operator<br> - * <br> - * This module is same as a "FirstOf" metric on any key,val pair. At end of window all data is flushed.<br> - * <br> - * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> - * <b>Partitions : Yes, </b> distinct output is unified by unifier hash map operator. <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: Input data port expects Map<K,V><br> - * <b>distinct</b>: Output data port, emits HashMap<K,V>(1)<br> - * <br> - * </p> - * - * @displayName Distinct Key Value Merge - * @category Stream Manipulators - * @tags filter, unique, key value - * - * @since 0.3.2 - */ - -@OperatorAnnotation(partitionable = true) -public class DistinctMap<K, V> extends BaseKeyValueOperator<K, V> -{ - /** - * The input port on which key value pairs are received. - */ - public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() - { - /** - * Process HashMap<K,V> tuple on input port data, and emits if match not found. Updates the cache - * with new key,val pair - */ - @Override - public void process(Map<K, V> tuple) - { - for (Map.Entry<K, V> e: tuple.entrySet()) { - HashMap<V, Object> vals = mapkeyval.get(e.getKey()); - if ((vals == null) || !vals.containsKey(e.getValue())) { - HashMap<K, V> otuple = new HashMap<K, V>(1); - otuple.put(cloneKey(e.getKey()), cloneValue(e.getValue())); - distinct.emit(otuple); - if (vals == null) { - vals = new HashMap<V, Object>(); - mapkeyval.put(cloneKey(e.getKey()), vals); - } - vals.put(cloneValue(e.getValue()), null); - } - } - } - }; - - /** - * The output port on which distinct key value pairs are emitted. - */ - public final transient DefaultOutputPort<HashMap<K, V>> distinct = new DefaultOutputPort<HashMap<K, V>>() - { - @Override - public Unifier<HashMap<K, V>> getUnifier() - { - return new UnifierHashMap<K, V>(); - } - }; - - - protected HashMap<K, HashMap<V, Object>> mapkeyval = new HashMap<K, HashMap<V, Object>>(); - - /** - * Clears the cache/hash - */ - @Override - public void endWindow() - { - mapkeyval.clear(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java b/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java deleted file mode 100644 index 41259a7..0000000 --- a/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.algo; - -import java.util.HashMap; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.api.annotation.Stateless; - -import com.datatorrent.lib.util.BaseKeyOperator; - -/** - * This operator filters the incoming stream of tuples using a set of specified key value pairs. - * Tuples that match the filter are emitted by the operator. - * <p> - * Filters the incoming stream based of specified key,val pairs, and emits those that match the filter. If - * property "inverse" is set to "true", then all key,val pairs except those specified by in keyvals parameter are emitted - * </p> - * <p> - * Operator assumes that the key, val pairs are immutable objects. If this operator has to be used for mutable objects, - * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of V.<br> - * This is a pass through node<br> - * <br> - * <b>StateFull : No, </b> tuple are processed in current window. <br> - * <b>Partitions : Yes, </b> no dependency among input tuples. <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects HashMap<K,V><br> - * <b>filter</b>: emits HashMap<K,V>(1)<br> - * <br> - * <b>Properties</b>:<br> - * <b>keyvals</b>: The keyvals is key,val pairs to pass through, rest are filtered/dropped.<br> - * <br> - * </p> - * - * @displayName Filter Keyval Pairs - * @category Rules and Alerts - * @tags filter, key value - * - * @since 0.3.2 - */ -@Stateless -@OperatorAnnotation(partitionable = true) -public class FilterKeyVals<K,V> extends BaseKeyOperator<K> -{ - /** - * The input port on which key value pairs are received. - */ - public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>() - { - /** - * Processes incoming tuples one key,val at a time. Emits if at least one key makes the cut - * By setting inverse as true, match is changed to un-matched - */ - @Override - public void process(HashMap<K, V> tuple) - { - for (Map.Entry<K, V> e: tuple.entrySet()) { - entry.clear(); - entry.put(e.getKey(),e.getValue()); - boolean contains = keyvals.containsKey(entry); - if ((contains && !inverse) || (!contains && inverse)) { - HashMap<K, V> dtuple = new HashMap<K,V>(1); - dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue())); - filter.emit(dtuple); - } - } - } - }; - - /** - * The output port on which filtered key value pairs are emitted. - */ - public final transient DefaultOutputPort<HashMap<K, V>> filter = new DefaultOutputPort<HashMap<K, V>>(); - - @NotNull() - HashMap<HashMap<K,V>,Object> keyvals = new HashMap<HashMap<K,V>,Object>(); - boolean inverse = false; - private transient HashMap<K,V> entry = new HashMap<K,V>(1); - - /** - * Gets the inverse property. - * @return inverse - */ - public boolean getInverse() - { - return inverse; - } - - /** - * If true then only matches are emitted. If false then only non matches are emitted. - * @param val - */ - public void setInverse(boolean val) - { - inverse = val; - } - - /** - * True means match; False means unmatched - * @return keyvals hash - */ - @NotNull() - public HashMap<HashMap<K,V>,Object> getKeyVals() - { - return keyvals; - } - - /** - * Adds a key to the filter list - * @param map with key,val pairs to set as filters - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void setKeyVals(HashMap<K,V> map) - { - for (Map.Entry<K, V> e: map.entrySet()) { - HashMap kvpair = new HashMap<K,V>(1); - kvpair.put(cloneKey(e.getKey()), cloneValue(e.getValue())); - keyvals.put(kvpair, null); - } - } - - /* - * Clears the filter list - */ - public void clearKeys() - { - keyvals.clear(); - } - - /** - * Clones V object. By default assumes immutable object (i.e. a copy is not made). If object is mutable, override this method - * @param v value to be cloned - * @return returns v as is (assumes immutable object) - */ - public V cloneValue(V v) - { - return v; - } -}
