Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 228d46cec -> 217f8db47
MLHR-1812 implement anti join operator Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9dac355a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9dac355a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9dac355a Branch: refs/heads/devel-3 Commit: 9dac355a4e8eb4185e9d973db11bfbf408a0dc4f Parents: 5b9eff8 Author: Dongming Liang <[email protected]> Authored: Fri Nov 20 16:12:18 2015 -0800 Committer: Dongming Liang <[email protected]> Committed: Fri Nov 20 16:12:18 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/lib/join/AntiJoinOperator.java | 203 +++++++++++++++++++ .../lib/join/AntiJoinOperatorTest.java | 119 +++++++++++ 2 files changed, 322 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9dac355a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java new file mode 100644 index 0000000..382a0d6 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.index.Index; + + +/** + * An implementation of Operator that reads table row data from two table data input ports. <br> + * <p> + * Operator anti-joins row on given condition and selected names, emits + * anti-joined result at output port. + * <br> + * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> + * <b>Partitions : No, </b> will yield wrong result(s). <br> + * <br> + * <b>Ports : </b> <br> + * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br> + * <b> inport2 : </b> Input port for table 2, expects HashMap<String, Object> <br> + * <b> outport : </b> Output anti-joined row port, emits HashMap<String, ArrayList<Object>> <br> + * <br> + * <b> Properties : </b> + * <b> joinCondition : </b> Join condition for table rows. <br> + * <b> table1Columns : </b> Columns to be selected from table1. <br> + * <b> table2Columns : </b> Columns to be selected from table2. <br> + * <br> + * + * @displayName Anti join + * @category Stream Manipulators + * @tags sql, anti join operator + * @since 0.3.3 + */ +@OperatorAnnotation(partitionable = false) +@Evolving +public class AntiJoinOperator implements Operator +{ + + /** + * Join Condition; + */ + private Condition joinCondition; + + /** + * Table1 select columns. + * Note: only left table (Table1) will be output in an Anti-join + */ + private ArrayList<Index> table1Columns = new ArrayList<>(); + + /** + * Collect data rows from input port 1. + */ + private List<Map<String, Object>> table1; + + /** + * Collect data from input port 2. + */ + private List<Map<String, Object>> table2; + + /** + * Input port 1 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table1.add(tuple); + for (int j = 0; j < table2.size(); j++) { + if ((joinCondition != null) && (joinCondition.isValidJoin(tuple, table2.get(j)))) { + table1.remove(tuple); + } + } + } + }; + + /** + * Input port 2 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table2.add(tuple); + + for (int j = 0; j < table1.size(); j++) { + if ((joinCondition != null) + && (joinCondition.isValidJoin(table1.get(j), tuple))) { + table1.remove(table1.get(j)); + } + } + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<>(); + + @Override + public void setup(OperatorContext arg0) + { + table1 = new ArrayList<>(); + table2 = new ArrayList<>(); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long arg0) + { + } + + @Override + public void endWindow() + { + /* All joined rows have been removed + * The ones left are the anti-joined result + */ + for (int i = 0; i < table1.size(); i++) { + joinRows(table1.get(i)); + } + + table1.clear(); + table2.clear(); + } + + /** + * @return the joinCondition + */ + public Condition getJoinCondition() + { + return joinCondition; + } + + /** + * Pick the supported condition. Currently only equal join is supported. + * + * @param joinCondition - join condition + */ + public void setJoinCondition(Condition joinCondition) + { + this.joinCondition = joinCondition; + } + + /** + * Select table1 column name. + */ + public void selectTable1Column(Index column) + { + table1Columns.add(column); + } + + /** + * Join row from table1 and table2. + */ + protected void joinRows(Map<String, Object> row) + { + // joined row + Map<String, Object> join = new HashMap<>(); + + // filter table1 columns + if (row != null) { + for (Index index : table1Columns) { + index.filter(row, join); + } + } + + // emit row + outport.emit(join); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9dac355a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java new file mode 100644 index 0000000..4241a80 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.HashMap; +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +/** + * + * Functional test for {@link AntiJoinOperator }. + * + */ +public class AntiJoinOperatorTest +{ + @Test + public void testSqlSelect() + { + // create operator + AntiJoinOperator oper = new AntiJoinOperator(); + CollectorTestSink sink = new CollectorTestSink(); + TestUtils.setSink(oper.outport, sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable1Column(new ColumnIndex("c", null)); + + oper.setup(null); + HashMap<String, Object> tuple = new HashMap<>(); + + // test 1, positive result + oper.beginWindow(1); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 2); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + + // expected anti-joined result: {b=3, c=4} + Assert.assertEquals("number of anti-join result", 1, sink.collectedTuples.size()); + sink.clear(); + + // test 2, negative result (empty result) + oper.beginWindow(2); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + // expected anti-joined result: [] + Assert.assertEquals("number of anti-join result (empty)", 0, sink.collectedTuples.size()); + } +}
