Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 333a70733 -> 7a6c94c90
MLHR-1813 implement semi-join operator for two input tables Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b042f00a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b042f00a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b042f00a Branch: refs/heads/devel-3 Commit: b042f00a5c70e57872899773ed2a9e30e4a43868 Parents: 333a707 Author: Dongming Liang <[email protected]> Authored: Tue Dec 1 18:17:03 2015 -0800 Committer: Dongming Liang <[email protected]> Committed: Tue Dec 1 18:17:03 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/lib/join/SemiJoinOperator.java | 206 +++++++++++++++++++ .../lib/join/SemiJoinOperatorTest.java | 174 ++++++++++++++++ 2 files changed, 380 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b042f00a/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java new file mode 100644 index 0000000..2ace5c8 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.index.Index; + +/** + * An implementation of Operator that reads table row data from two table data input ports. <br> + * <p> + * Operator semi-joins row on given condition and selected names, emits + * semi-joined result at output port. + * + * Note: A semi-join is not a left-join or left-outer-join. In semi-join only the joined rows + * from the left table are returned. However, in a left-outer-join, all the rows from left table + * will be returned (also padding with nulls for columns from the right table when not joined). + * + * For more information see {@link http://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_10002.htm#i2166436} + * + * <br> + * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> + * <b>Partitions : No, </b> will yield wrong result(s). <br> + * <br> + * <b>Ports : </b> <br> + * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br> + * <b> inport2 : </b> Input port for table 2, expects HashMap<String, Object> <br> + * <b> outport : </b> Output semi-joined row port, emits HashMap<String, ArrayList<Object>> <br> + * <br> + * <b> Properties : </b> + * <b> joinCondition : </b> Join condition for table rows. <br> + * <b> table1Columns : </b> Columns to be selected from table1. <br> + * <b> table2Columns : </b> Columns to be selected from table2. <br> + * <br> + * + * @displayName Semi join + * @category Join Manipulators + * @tags sql, semi join operator + * @since 0.3.3 + */ +@OperatorAnnotation(partitionable = false) +@Evolving +public class SemiJoinOperator implements Operator +{ + + /** + * Join Condition. + */ + private Condition joinCondition; + + /** + * Table1 select columns. + * Note: only left table (Table1) will be output in an semi-join + */ + private ArrayList<Index> table1Columns = new ArrayList<>(); + + /** + * Collect data rows from input port 1. + */ + private List<Map<String, Object>> table1; + + /** + * Collect data from input port 2. + */ + private List<Map<String, Object>> table2; + + /** + * Input port 1 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table1.add(tuple); + for (int j = 0; j < table2.size(); j++) { + if ((joinCondition != null) && (joinCondition.isValidJoin(tuple, table2.get(j)))) { + joinRows(tuple); + // row has been join, and can be removed now + table1.remove(tuple); + } + } + } + }; + + /** + * Input port 2 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table2.add(tuple); + + for (int j = 0; j < table1.size(); j++) { + if ((joinCondition != null) + && (joinCondition.isValidJoin(table1.get(j), tuple))) { + joinRows(table1.get(j)); + table1.remove(table1.get(j)); + } + } + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<>(); + + @Override + public void setup(OperatorContext arg0) + { + table1 = new ArrayList<>(); + table2 = new ArrayList<>(); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long arg0) + { + } + + @Override + public void endWindow() + { + table1.clear(); + table2.clear(); + } + + /** + * @return the joinCondition + */ + public Condition getJoinCondition() + { + return joinCondition; + } + + /** + * Pick the supported condition. Currently only equal join is supported. + * + * @param joinCondition - join condition + */ + public void setJoinCondition(Condition joinCondition) + { + this.joinCondition = joinCondition; + } + + /** + * Select table1 column name. + */ + public void selectTable1Column(Index column) + { + table1Columns.add(column); + } + + /** + * Join row from table1 (only left table is used in semi-join). + */ + protected void joinRows(Map<String, Object> row) + { + // joined row + Map<String, Object> join = new HashMap<>(); + + // filter table1 columns + if (row != null) { + for (Index index : table1Columns) { + index.filter(row, join); + } + } + + // emit row + outport.emit(join); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b042f00a/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java new file mode 100644 index 0000000..413f05f --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +/** + * + * Functional test for {@link SemiJoinOperator }. + * + */ +public class SemiJoinOperatorTest +{ + @Test + public void testSqlSelect() + { + // create operator + SemiJoinOperator oper = new SemiJoinOperator(); + CollectorTestSink sink = new CollectorTestSink(); + TestUtils.setSink(oper.outport, sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns (only columns from the left table) + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable1Column(new ColumnIndex("c", null)); + + oper.setup(null); + HashMap<String, Object> tuple = new HashMap<>(); + + /** + * test 1, positive result + * + * table1 table2 + * ====== ====== + * a b c a e f + * ------------- ----------- + * 0 1 2 0 7 8 + * 1 3 4 2 5 6 + * 0 6 8 + * + * select b, c + * from table1 semi-join table2 + * where table1.a = table2.a + * + * result: + * + * b c + * ------------- + * 1 2 + * 6 8 + */ + oper.beginWindow(1); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 0); + tuple.put("b", 6); + tuple.put("c", 8); + + oper.inport1.process(tuple); + tuple = new HashMap<>(); + tuple.put("a", 0); + tuple.put("e", 7); + tuple.put("f", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 2); + tuple.put("e", 5); + tuple.put("f", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + + // expected semi-joined result: {1,2}, {6, 8} (two rows) + Assert.assertEquals("number of semi-join result", 2, sink.collectedTuples.size()); + sink.clear(); + + /** + * test 2, negative result (empty result) + * + * table1 table2 + * ====== ====== + * a b c a e f + * ------------- ----------- + * 1 1 2 0 7 8 + * 1 3 4 2 5 6 + * 3 6 8 + * + * select b, c + * from table1 semi-join table2 + * where table1.a = table2.a + * + * result: + * + * b c + * ------------- + * + * (0 row) + */ + oper.beginWindow(2); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 3); + tuple.put("b", 6); + tuple.put("c", 8); + oper.inport1.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 0); + tuple.put("e", 7); + tuple.put("f", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<>(); + tuple.put("a", 2); + tuple.put("e", 5); + tuple.put("f", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + // expected semi-joined result: [] (empty) + Assert.assertEquals("number of semi-join result (empty)", 0, sink.collectedTuples.size()); + } +}
