Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 333a70733 -> 7a6c94c90


MLHR-1813 implement semi-join operator for two input tables


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b042f00a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b042f00a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b042f00a

Branch: refs/heads/devel-3
Commit: b042f00a5c70e57872899773ed2a9e30e4a43868
Parents: 333a707
Author: Dongming Liang <[email protected]>
Authored: Tue Dec 1 18:17:03 2015 -0800
Committer: Dongming Liang <[email protected]>
Committed: Tue Dec 1 18:17:03 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/join/SemiJoinOperator.java  | 206 +++++++++++++++++++
 .../lib/join/SemiJoinOperatorTest.java          | 174 ++++++++++++++++
 2 files changed, 380 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b042f00a/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java 
b/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java
new file mode 100644
index 0000000..2ace5c8
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/SemiJoinOperator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * An implementation of Operator that reads table row data from two table data 
input ports. <br>
+ * <p>
+ * Operator semi-joins row on given condition and selected names, emits
+ * semi-joined result at output port.
+ *
+ * Note: A semi-join is not a left-join or left-outer-join. In semi-join only 
the joined rows
+ * from the left table are returned. However, in a left-outer-join, all the 
rows from left table
+ * will be returned (also padding with nulls for columns from the right table 
when not joined).
+ *
+ * For more information see {@link 
http://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_10002.htm#i2166436}
+ *
+ * <br>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. 
<br>
+ * <b>Partitions : No, </b> will yield wrong result(s). <br>
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> inport1 : </b> Input port for table 1, expects HashMap&lt;String, 
Object&gt; <br>
+ * <b> inport2 : </b> Input port for table 2, expects HashMap&lt;String, 
Object&gt; <br>
+ * <b> outport : </b> Output semi-joined row port, emits HashMap&lt;String, 
ArrayList&lt;Object&gt;&gt; <br>
+ * <br>
+ * <b> Properties : </b>
+ * <b> joinCondition : </b> Join condition for table rows. <br>
+ * <b> table1Columns : </b> Columns to be selected from table1. <br>
+ * <b> table2Columns : </b> Columns to be selected from table2. <br>
+ * <br>
+ *
+ * @displayName Semi join
+ * @category Join Manipulators
+ * @tags sql, semi join operator
+ * @since 0.3.3
+ */
+@OperatorAnnotation(partitionable = false)
+@Evolving
+public class SemiJoinOperator implements Operator
+{
+
+  /**
+   * Join Condition.
+   */
+  private Condition joinCondition;
+
+  /**
+   * Table1 select columns.
+   * Note: only left table (Table1) will be output in an semi-join
+   */
+  private ArrayList<Index> table1Columns = new ArrayList<>();
+
+  /**
+   * Collect data rows from input port 1.
+   */
+  private List<Map<String, Object>> table1;
+
+  /**
+   * Collect data from input port 2.
+   */
+  private List<Map<String, Object>> table2;
+
+  /**
+   * Input port 1 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport1 = new 
DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table1.add(tuple);
+      for (int j = 0; j < table2.size(); j++) {
+        if ((joinCondition != null) && (joinCondition.isValidJoin(tuple, 
table2.get(j)))) {
+          joinRows(tuple);
+          // row has been join, and can be removed now
+          table1.remove(tuple);
+        }
+      }
+    }
+  };
+
+  /**
+   * Input port 2 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport2 = new 
DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table2.add(tuple);
+
+      for (int j = 0; j < table1.size(); j++) {
+        if ((joinCondition != null)
+            && (joinCondition.isValidJoin(table1.get(j), tuple))) {
+          joinRows(table1.get(j));
+          table1.remove(table1.get(j));
+        }
+      }
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new 
DefaultOutputPort<>();
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    table1 = new ArrayList<>();
+    table2 = new ArrayList<>();
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long arg0)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    table1.clear();
+    table2.clear();
+  }
+
+  /**
+   * @return the joinCondition
+   */
+  public Condition getJoinCondition()
+  {
+    return joinCondition;
+  }
+
+  /**
+   * Pick the supported condition. Currently only equal join is supported.
+   *
+   * @param joinCondition - join condition
+   */
+  public void setJoinCondition(Condition joinCondition)
+  {
+    this.joinCondition = joinCondition;
+  }
+
+  /**
+   * Select table1 column name.
+   */
+  public void selectTable1Column(Index column)
+  {
+    table1Columns.add(column);
+  }
+
+  /**
+   * Join row from table1 (only left table is used in semi-join).
+   */
+  protected void joinRows(Map<String, Object> row)
+  {
+    // joined row
+    Map<String, Object> join = new HashMap<>();
+
+    // filter table1 columns
+    if (row != null) {
+      for (Index index : table1Columns) {
+        index.filter(row, join);
+      }
+    }
+
+    // emit row
+    outport.emit(join);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b042f00a/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java 
b/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java
new file mode 100644
index 0000000..413f05f
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/SemiJoinOperatorTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ *
+ * Functional test for {@link SemiJoinOperator }.
+ *
+ */
+public class SemiJoinOperatorTest
+{
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SemiJoinOperator oper = new SemiJoinOperator();
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.outport, sink);
+
+    // set column join condition
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+
+    // add columns (only columns from the left table)
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable1Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    HashMap<String, Object> tuple = new HashMap<>();
+
+    /**
+     * test 1, positive result
+     *
+     * table1                      table2
+     * ======                      ======
+     *    a     b    c             a    e    f
+     *    -------------            -----------
+     *    0     1    2             0    7    8
+     *    1     3    4             2    5    6
+     *    0     6    8
+     *
+     * select b, c
+     * from   table1 semi-join table2
+     * where  table1.a = table2.a
+     *
+     * result:
+     *
+     *         b    c
+     *    -------------
+     *         1    2
+     *         6    8
+     */
+    oper.beginWindow(1);
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 0);
+    tuple.put("b", 6);
+    tuple.put("c", 8);
+
+    oper.inport1.process(tuple);
+    tuple = new HashMap<>();
+    tuple.put("a", 0);
+    tuple.put("e", 7);
+    tuple.put("f", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 2);
+    tuple.put("e", 5);
+    tuple.put("f", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+
+    // expected semi-joined result: {1,2}, {6, 8} (two rows)
+    Assert.assertEquals("number of semi-join result", 2, 
sink.collectedTuples.size());
+    sink.clear();
+
+    /**
+     * test 2, negative result (empty result)
+     *
+     * table1                      table2
+     * ======                      ======
+     *    a     b    c             a    e    f
+     *    -------------            -----------
+     *    1     1    2             0    7    8
+     *    1     3    4             2    5    6
+     *    3     6    8
+     *
+     * select b, c
+     * from   table1 semi-join table2
+     * where  table1.a = table2.a
+     *
+     * result:
+     *
+     *         b    c
+     *    -------------
+     *
+     *    (0 row)
+     */
+    oper.beginWindow(2);
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 3);
+    tuple.put("b", 6);
+    tuple.put("c", 8);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 0);
+    tuple.put("e", 7);
+    tuple.put("f", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 2);
+    tuple.put("e", 5);
+    tuple.put("f", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    // expected semi-joined result: [] (empty)
+    Assert.assertEquals("number of semi-join result (empty)", 0, 
sink.collectedTuples.size());
+  }
+}

Reply via email to