http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java new file mode 100644 index 0000000..16f1036 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/AbstractSqlStreamOperator.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.HashMap; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * A base implementation of a BaseOperator that is a sql stream operator. Subclasses should provide the + implementation of how to process the tuples. + * <p> + * Abstract sql db input operator. + * <p> + * @displayName Abstract Sql Stream + * @category Stream Manipulators + * @tags sql operator + * @since 0.3.2 + * @deprecated + */ +@Deprecated +public abstract class AbstractSqlStreamOperator extends BaseOperator +{ + public static class InputSchema + { + public static class ColumnInfo + { + public String type; + public int bindIndex = 0; + public boolean isColumnIndex = false; + } + + /** + * the name of the input "table" + */ + public String name; + /** + * key is the name of the column, and value is the SQL type + */ + public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>(); + + public InputSchema() + { + } + + public InputSchema(String name) + { + this.name = name; + } + + public void setColumnInfo(String columnName, String columnType, boolean isColumnIndex) + { + ColumnInfo t = new ColumnInfo(); + t.type = columnType; + t.isColumnIndex = isColumnIndex; + columnInfoMap.put(columnName, t); + } + + } + + protected String statement; + protected ArrayList<InputSchema> inputSchemas = new ArrayList<InputSchema>(5); + protected transient ArrayList<Object> bindings; + + /** + * Input bindings port that takes an arraylist of objects. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ArrayList<Object>> bindingsPort = new DefaultInputPort<ArrayList<Object>>() + { + @Override + public void process(ArrayList<Object> tuple) + { + bindings = tuple; + } + + }; + + /** + * Input port in1 that takes a hashmap of <string,object>. + */ + public final transient DefaultInputPort<HashMap<String, Object>> in1 = new DefaultInputPort<HashMap<String, Object>>() + { + @Override + public void process(HashMap<String, Object> tuple) + { + processTuple(0, tuple); + } + + }; + + /** + * Input port in2 that takes a hashmap of <string,object>. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<HashMap<String, Object>> in2 = new DefaultInputPort<HashMap<String, Object>>() + { + @Override + public void process(HashMap<String, Object> tuple) + { + processTuple(1, tuple); + } + + }; + + /** + * Input port in3 that takes a hashmap of <string,object>. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<HashMap<String, Object>> in3 = new DefaultInputPort<HashMap<String, Object>>() + { + @Override + public void process(HashMap<String, Object> tuple) + { + processTuple(2, tuple); + } + + }; + + /** + * Input port in4 that takes a hashmap of <string,object>. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<HashMap<String, Object>> in4 = new DefaultInputPort<HashMap<String, Object>>() + { + @Override + public void process(HashMap<String, Object> tuple) + { + processTuple(3, tuple); + } + + }; + + /** + * Input port in5 that takes a hashmap of <string,object>. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<HashMap<String, Object>> in5 = new DefaultInputPort<HashMap<String, Object>>() + { + @Override + public void process(HashMap<String, Object> tuple) + { + processTuple(4, tuple); + } + + }; + + /** + * Output result port that emits a hashmap of <string,object>. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<HashMap<String, Object>> result = new DefaultOutputPort<HashMap<String, Object>>(); + + public void setStatement(String statement) + { + this.statement = statement; + } + + public String getStatement() + { + return this.statement; + } + + public void setInputSchema(int inputPortIndex, InputSchema inputSchema) + { + inputSchemas.add(inputPortIndex, inputSchema); + } + + public abstract void processTuple(int tableNum, HashMap<String, Object> tuple); + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java new file mode 100644 index 0000000..7faf96d --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperator.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * An implementation of BaseOperator that provides sql delete query semantic on live data stream. <br> + * <p> + * Stream rows passing condition are emitted on output port stream. <br> + * <br> + * <b>StateFull : NO,</b> all row data is processed in current time window. <br> + * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> + * <br> + * <b>Ports</b>:<br> + * <b> inport : </b> Input hash map(row) port, expects + * HashMap<String,Object><<br> + * <b> outport : </b> Output hash map(row) port, emits + * HashMap<String,Object><br> + * <br> + * <b> Properties : <b> <br> + * <b> condition : </b> Select condition for selecting rows. <br> + * <b> columns : </b> Column names/aggregate functions for select. <br> + * <br> + * @displayName Delete + * @category Stream Manipulators + * @tags sql delete operator + * @since 0.3.3 + * @deprecated + */ +@Deprecated +public class DeleteOperator extends BaseOperator +{ + + /** + * condition. + */ + private Condition condition = null; + + /** + * set condition. + */ + public void setCondition(Condition condition) + { + this.condition = condition; + } + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + + @Override + public void process(Map<String, Object> tuple) + { + if ((condition != null) && (!condition.isValidRow(tuple))) { + outport.emit(tuple); + } + } + }; + + /** + * Output port emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java new file mode 100644 index 0000000..a55c7d7 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/DerbySqlStreamOperator.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo; + +import com.datatorrent.api.Context.OperatorContext; + +/** + * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator. + * <p> + * @displayName Derby Sql Stream + * @category Stream Manipulators + * @tags sql, in-memory, input operator + * @since 0.3.2 + * @deprecated + */ +@Deprecated +public class DerbySqlStreamOperator extends AbstractSqlStreamOperator +{ + protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5); + protected List<String> execStmtStringList = new ArrayList<String>(); + protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5); + protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5); + protected transient Connection db; + + public void addExecStatementString(String stmt) + { + this.execStmtStringList.add(stmt); + } + + + @Override + public void setup(OperatorContext context) + { + System.setProperty("derby.stream.error.file", "/dev/null"); + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + + String connUrl = "jdbc:derby:memory:MALHAR_TEMP;create=true"; + PreparedStatement st; + + try { + db = DriverManager.getConnection(connUrl); + // create the temporary tables here + for (int i = 0; i < inputSchemas.size(); i++) { + InputSchema inputSchema = inputSchemas.get(i); + if (inputSchema == null || inputSchema.columnInfoMap.isEmpty()) { + continue; + } + String columnSpec = ""; + String columnNames = ""; + String insertQuestionMarks = ""; + int j = 0; + for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) { + if (!columnSpec.isEmpty()) { + columnSpec += ","; + columnNames += ","; + insertQuestionMarks += ","; + } + columnSpec += entry.getKey(); + columnSpec += " "; + columnSpec += entry.getValue().type; + columnNames += entry.getKey(); + insertQuestionMarks += "?"; + entry.getValue().bindIndex = ++j; + } + String createTempTableStmt = + "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED"; + st = db.prepareStatement(createTempTableStmt); + st.execute(); + st.close(); + + String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES (" + + insertQuestionMarks + ")"; + + insertStatements.add(i, db.prepareStatement(insertStmt)); + deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name)); + } + for (String stmtStr : execStmtStringList) { + execStatements.add(db.prepareStatement(stmtStr)); + } + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void beginWindow(long windowId) + { + try { + db.setAutoCommit(false); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void processTuple(int tableNum, HashMap<String, Object> tuple) + { + InputSchema inputSchema = inputSchemas.get(tableNum); + + PreparedStatement insertStatement = insertStatements.get(tableNum); + try { + for (Map.Entry<String, Object> entry : tuple.entrySet()) { + ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey()); + if (t != null && t.bindIndex != 0) { + insertStatement.setString(t.bindIndex, entry.getValue().toString()); + } + } + + insertStatement.executeUpdate(); + insertStatement.clearParameters(); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void endWindow() + { + try { + db.commit(); + if (bindings != null) { + for (int i = 0; i < bindings.size(); i++) { + for (PreparedStatement stmt : execStatements) { + stmt.setString(i, bindings.get(i).toString()); + } + } + } + + for (PreparedStatement stmt : execStatements) { + executePreparedStatement(stmt); + } + for (PreparedStatement st : deleteStatements) { + st.executeUpdate(); + st.clearParameters(); + } + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + bindings = null; + } + + private void executePreparedStatement(PreparedStatement statement) throws SQLException + { + ResultSet res = statement.executeQuery(); + ResultSetMetaData resmeta = res.getMetaData(); + int columnCount = resmeta.getColumnCount(); + while (res.next()) { + HashMap<String, Object> resultRow = new HashMap<String, Object>(); + for (int i = 1; i <= columnCount; i++) { + resultRow.put(resmeta.getColumnName(i), res.getObject(i)); + } + this.result.emit(resultRow); + } + statement.clearParameters(); + } + + @Override + public void teardown() + { + try { + db.close(); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java new file mode 100644 index 0000000..9999429 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByHavingOperator.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCondition; +import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.streamquery.index.ColumnIndex; +import com.datatorrent.lib.streamquery.condition.Condition; +@Deprecated +public class GroupByHavingOperator extends BaseOperator +{ + + /** + * aggregate indexes. + */ + private ArrayList<FunctionIndex> aggregates = new ArrayList<FunctionIndex>(); + + /** + * Column, Group by names + */ + private ArrayList<ColumnIndex> columnGroupIndexes = new ArrayList<ColumnIndex>(); + + /** + * where condition. + */ + private Condition condition; + + /** + * having aggregate condtion; + */ + private ArrayList<HavingCondition> havingConditions = new ArrayList<HavingCondition>(); + + /** + * Table rows. + */ + private ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); + + public void addAggregateIndex(@NotNull FunctionIndex index) + { + aggregates.add(index); + } + + public void addColumnGroupByIndex(@NotNull ColumnIndex index) + { + columnGroupIndexes.add(index); + } + + public void addHavingCondition(@NotNull HavingCondition condition) + { + havingConditions.add(condition); + } + + /** + * @param condition condition + */ + public void setCondition(Condition condition) + { + this.condition = condition; + } + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + + @Override + public void process(Map<String, Object> tuple) + { + if ((condition != null) && (!condition.isValidRow(tuple))) { + return; + } + rows.add(tuple); + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); + + /** + * Create aggregate at end window. + */ + @Override + public void endWindow() + { + // group names + if (columnGroupIndexes.size() == 0) { + rows = new ArrayList<Map<String, Object>>(); + return; + } + + // group rows + HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>> groups = new HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>>(); + for (Map<String, Object> row : rows) { + MultiKeyCompare key = new MultiKeyCompare(); + for (ColumnIndex index : columnGroupIndexes) { + key.addCompareKey(row.get(index.getColumn())); + } + ArrayList<Map<String, Object>> subRows; + if (groups.containsKey(key)) { + subRows = groups.get(key); + } else { + subRows = new ArrayList<Map<String, Object>>(); + groups.put(key, subRows); + } + subRows.add(row); + } + + // Iterate over groups and emit aggregate values + for (Map.Entry<MultiKeyCompare, ArrayList<Map<String, Object>>> entry : groups + .entrySet()) { + ArrayList<Map<String, Object>> subRows = entry.getValue(); + + // get result + Map<String, Object> result = new HashMap<String, Object>(); + for (ColumnIndex index : columnGroupIndexes) { + index.filter(subRows.get(0), result); + } + + // append aggregate values + for (FunctionIndex aggregate : aggregates) { + try { + aggregate.filter(subRows, result); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // check valid having aggregate + boolean isValidHaving = true; + for (HavingCondition condition : havingConditions) { + try { + isValidHaving &= condition.isValidAggregate(subRows); + } catch (Exception e) { + e.printStackTrace(); + return; + } + } + if (isValidHaving) { + outport.emit(result); + } + } + + rows = new ArrayList<Map<String, Object>>(); + } + + /** + * multi key compare class. + */ + @SuppressWarnings("rawtypes") + private class MultiKeyCompare implements Comparable + { + + /** + * compare keys. + */ + ArrayList<Object> compareKeys = new ArrayList<Object>(); + + @Override + public boolean equals(Object other) + { + if (other instanceof MultiKeyCompare) { + if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) { + return false; + } + } + for (int i = 0; i < compareKeys.size(); i++) { + if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) { + return false; + } + } + return true; + } + + @Override + public int hashCode() + { + int hashCode = 0; + for (int i = 0; i < compareKeys.size(); i++) { + hashCode += compareKeys.get(i).hashCode(); + } + return hashCode; + } + + @Override + public int compareTo(Object other) + { + if (this.equals(other)) { + return 0; + } + return -1; + } + + /** + * Add compare key. + */ + public void addCompareKey(Object value) + { + compareKeys.add(value); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java new file mode 100644 index 0000000..d3e11c3 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperator.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.index.Index; + +/** + * An implementation of Operator that reads table row data from two table data input ports. <br> + * <p> + * Operator joins row on given condition and selected names, emits + * joined result at output port. + * <br> + * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> + * <b>Partitions : No, </b> will yield wrong result(s). <br> + * <br> + * <b>Ports : </b> <br> + * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br> + * <b> inport1 : </b> Input port for table 2, expects HashMap<String, Object> <br> + * <b> outport : </b> Output joined row port, emits HashMap<String, ArrayList<Object>> <br> + * <br> + * <b> Properties : </b> + * <b> joinCondition : </b> Join condition for table rows. <br> + * <b> table1Columns : </b> Columns to be selected from table1. <br> + * <b> table2Columns : </b> Columns to be selected from table2. <br> + * <br> + * @displayName Inner join + * @category Stream Manipulators + * @tags sql, inner join operator + * + * @since 0.3.3 + * @deprecated + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class InnerJoinOperator implements Operator +{ + + /** + * Join Condition; + */ + protected Condition joinCondition; + + /** + * Table1 select columns. + */ + private ArrayList<Index> table1Columns = new ArrayList<Index>(); + + /** + * Table2 select columns. + */ + private ArrayList<Index> table2Columns = new ArrayList<Index>(); + + /** + * Collect data rows from input port 1. + */ + protected ArrayList<Map<String, Object>> table1; + + /** + * Collect data from input port 2. + */ + protected ArrayList<Map<String, Object>> table2; + + /** + * Input port 1 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table1.add(tuple); + for (int j = 0; j < table2.size(); j++) { + if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) { + joinRows(tuple, table2.get(j)); + } + } + } + }; + + /** + * Input port 2 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table2.add(tuple); + for (int j = 0; j < table1.size(); j++) { + if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) { + joinRows(table1.get(j), tuple); + } + } + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = + new DefaultOutputPort<Map<String, Object>>(); + + @Override + public void setup(OperatorContext arg0) + { + table1 = new ArrayList<Map<String, Object>>(); + table2 = new ArrayList<Map<String, Object>>(); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long arg0) + { + } + + @Override + public void endWindow() + { + table1.clear(); + table2.clear(); + } + + /** + * @return the joinCondition + */ + public Condition getJoinCondition() + { + return joinCondition; + } + + /** + * Pick the supported condition. Currently only equal join is supported. + * @param joinCondition joinCondition + */ + public void setJoinCondition(Condition joinCondition) + { + this.joinCondition = joinCondition; + } + + /** + * Select table1 column name. + */ + public void selectTable1Column(Index column) + { + table1Columns.add(column); + } + + /** + * Select table2 column name. + */ + public void selectTable2Column(Index column) + { + table2Columns.add(column); + } + + /** + * Join row from table1 and table2. + */ + protected void joinRows(Map<String, Object> row1, Map<String, Object> row2) + { + // joined row + Map<String, Object> join = new HashMap<String, Object>(); + + // filter table1 columns + if (row1 != null) { + for (Index index: table1Columns) { + index.filter(row1, join); + } + } + + // filter table1 columns + if (row2 != null) { + for (Index index: table2Columns) { + index.filter(row2, join); + } + } + + // emit row + outport.emit(join); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java new file mode 100644 index 0000000..c7a5b25 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperator.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.Map; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.Unifier; + +/** + * An implementation of Operator that provides sql order by operator semantic over live stream data. <br> + * <p> + * Input data rows are ordered by order rules, ordered result is emitted on output port. <br> + * <br> + * * <br> + * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> + * <b>Partitions : Yes, </b> This operator is also unifier on output port. <br> + * <br> + * <b>Ports</b>:<br> + * <b> inport : </b> Input hash map(row) port, expects HashMap<String,Object><<br> + * <b> outport : </b> Output hash map(row) port, emits HashMap<String,Object><br> + * <br> + * <b> Properties : </b> <br> + * <b> orderByRules : </b>List of order by rules for tuples. + * @displayName OrderBy + * @category Stream Manipulators + * @tags orderby operator + * @since 0.3.5 + * @deprecated + */ +@Deprecated +public class OrderByOperator implements Operator, Unifier<Map<String, Object>> +{ + /** + * Order by rules. + */ + ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>(); + + /** + * Descending flag. + */ + private boolean isDescending; + + /** + * collected rows. + */ + private ArrayList<Map<String, Object>> rows; + + /** + * Add order by rule. + */ + public void addOrderByRule(OrderByRule<?> rule) + { + orderByRules.add(rule); + } + + /** + * @return isDescending + */ + public boolean isDescending() + { + return isDescending; + } + + /** + * @param isDescending isDescending + */ + public void setDescending(boolean isDescending) + { + this.isDescending = isDescending; + } + + @Override + public void process(Map<String, Object> tuple) + { + rows.add(tuple); + } + + @Override + public void beginWindow(long arg0) + { + rows = new ArrayList<Map<String, Object>>(); + } + + @Override + public void endWindow() + { + for (int i = 0; i < orderByRules.size(); i++) { + rows = orderByRules.get(i).sort(rows); + } + if (isDescending) { + for (int i = 0; i < rows.size(); i++) { + outport.emit(rows.get(i)); + } + } else { + for (int i = rows.size() - 1; i >= 0; i--) { + outport.emit(rows.get(i)); + } + } + } + + @Override + public void setup(OperatorContext arg0) + { + // TODO Auto-generated method stub + + } + + @Override + public void teardown() + { + // TODO Auto-generated method stub + + } + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + rows.add(tuple); + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>() + { + @Override + public Unifier<Map<String, Object>> getUnifier() + { + OrderByOperator unifier = new OrderByOperator(); + for (int i = 0; i < getOrderByRules().size(); i++) { + unifier.addOrderByRule(getOrderByRules().get(i)); + } + unifier.setDescending(isDescending); + return unifier; + } + }; + + /** + * @return the orderByRules + */ + public ArrayList<OrderByRule<?>> getOrderByRules() + { + return orderByRules; + } + + /** + * The order by rules used to order incoming tuples. + * @param oredrByRules the orderByRules to set + */ + public void setOrderByRules(ArrayList<OrderByRule<?>> oredrByRules) + { + this.orderByRules = oredrByRules; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java new file mode 100644 index 0000000..cc90354 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByRule.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.Map; +import java.util.TreeMap; + +/** + * Implements order by key name rule. <br> + * <p> + * <b>Properties : </b> <br> + * <b> columnName : </b> Name of column for ordering tuples. <br> + * @displayName OrderBy Rule + * @category Stream Manipulators + * @tags orderby, sort, comparison + * @since 0.3.3 + * @deprecated + */ +@Deprecated +@SuppressWarnings("rawtypes") +public class OrderByRule<T extends Comparable> +{ + + /** + * column name for ordering tuples. + */ + private String columnName; + + public OrderByRule(String name) + { + + columnName = name; + } + + /** + * sort rows by each rule and emit result on output port. + */ + @SuppressWarnings("unchecked") + public ArrayList<Map<String, Object>> sort(ArrayList<Map<String, Object>> rows) + { + + TreeMap<T, ArrayList<Map<String, Object>>> sorted = new TreeMap<T, ArrayList<Map<String, Object>>>(); + for (int i = 0; i < rows.size(); i++) { + Map<String, Object> row = rows.get(i); + if (row.containsKey(columnName)) { + T value = (T)row.get(columnName); + ArrayList<Map<String, Object>> list; + if (sorted.containsKey(value)) { + list = sorted.get(value); + } else { + list = new ArrayList<Map<String, Object>>(); + sorted.put(value, list); + } + list.add(row); + } + } + ArrayList<Map<String, Object>> result = new ArrayList<Map<String, Object>>(); + for (Map.Entry<T, ArrayList<Map<String, Object>>> entry : sorted.entrySet()) { + result.addAll(entry.getValue()); + } + return result; + } + + /** + * @return the columnName + */ + public String getColumnName() + { + + return columnName; + } + + /** + * @param columnName + * the columnName to set + */ + public void setColumnName(String columnName) + { + + this.columnName = columnName; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java new file mode 100644 index 0000000..9a8fde8 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/OuterJoinOperator.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +/** + * An operator that provides sql left,right and full outer join metric semantics on live stream. <br> + * <p> + * Please refer to {@link org.apache.apex.malhar.lib.misc.streamquery.InnerJoinOperator} for + * details. + * + * <b> Properties : </b> <br> + * <b> isLeftJoin : </b> Left join flag. <br> + * <b> isFullJoin : </b> Full join flag. <br> + * @displayName Outer Join + * @category Stream Manipulators + * @tags sql, outer join operator + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class OuterJoinOperator extends InnerJoinOperator +{ + + private boolean isLeftJoin = true; + private boolean isFullJoin = false; + + @Override + public void endWindow() + { + // full outer join + if (isFullJoin) { + for (int i = 0; i < table1.size(); i++) { + boolean merged = false; + for (int j = 0; j < table2.size(); j++) { + if ((joinCondition == null) + || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) { + merged = true; + } + } + if (!merged) { + joinRows(table1.get(i), null); + } + } + for (int i = 0; i < table2.size(); i++) { + boolean merged = false; + for (int j = 0; j < table1.size(); j++) { + if ((joinCondition == null) + || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) { + merged = true; + } + } + if (!merged) { // only output non merged rows + joinRows(null, table2.get(i)); + } + } + return; + } + + // left or right join + if (isLeftJoin) { + for (int i = 0; i < table1.size(); i++) { + boolean merged = false; + for (int j = 0; j < table2.size(); j++) { + if ((joinCondition == null) + || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) { + merged = true; + } + } + if (!merged) { + joinRows(table1.get(i), null); + } + } + } else { + for (int i = 0; i < table2.size(); i++) { + boolean merged = false; + for (int j = 0; j < table1.size(); j++) { + if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) { + merged = true; + } + } + if (!merged) { // only output non merged rows + joinRows(null, table2.get(i)); + } + } + } + } + + public void setLeftJoin() + { + isLeftJoin = true; + } + + public void setRighttJoin() + { + isLeftJoin = false; + } + + public boolean isFullJoin() + { + return isFullJoin; + } + + public void setFullJoin(boolean isFullJoin) + { + this.isFullJoin = isFullJoin; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java new file mode 100644 index 0000000..6e8008b --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectFunctionOperator.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OperatorAnnotation; + +/** + * An implementation of Operator that applies sql top or limit semantics on incoming tuple(s). <br> + * <p> + * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> + * <b>Partitions : No, </b> will yield wrong result(s). <br> + * <br> + * <b>Ports : </b> <br> + * <b>inport : </b> expect tuple for type T. <br> + * <b>outport : </b> emits tuple for type T. <br> + * <br> + * <b> Properties : </b> <br> + * <b> functions : </b> Sql function for rows. <br> + * @displayName Select Function + * @category Stream Manipulators + * @tags sql top, sql limit, sql select operator + * @since 0.3.4 + * @deprecated + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class SelectFunctionOperator implements Operator +{ + /** + * array of rows. + */ + private ArrayList<Map<String, Object>> rows; + + /** + * Aggregate function for rows. + */ + private ArrayList<FunctionIndex> functions = new ArrayList<FunctionIndex>(); + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + + @Override + public void process(Map<String, Object> row) + { + rows.add(row); + } + }; + + @Override + public void setup(OperatorContext context) + { + // TODO Auto-generated method stub + + } + + @Override + public void teardown() + { + // TODO Auto-generated method stub + + } + + @Override + public void beginWindow(long windowId) + { + rows = new ArrayList<Map<String, Object>>(); + } + + @Override + public void endWindow() + { + if (functions.size() == 0) { + return; + } + Map<String, Object> collect = new HashMap<String, Object>(); + for (FunctionIndex function : functions) { + try { + function.filter(rows, collect); + } catch (Exception e) { + e.printStackTrace(); + return; + } + } + outport.emit(collect); + } + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); + + /** + * Add sql function. + * @param function Sql function for rows. + */ + public void addSqlFunction(FunctionIndex function) + { + functions.add(function); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java new file mode 100644 index 0000000..08799cb --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperator.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.streamquery.condition.Condition; +import com.datatorrent.lib.streamquery.index.Index; + +/** + * An implementation of that provides sql select query semantics on live data stream. <br> + * <p> + * Stream rows passing condition are emitted on output port stream. <br> + * <br> + * <b>StateFull : NO,</b> all row data is processed in current time window. <br> + * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> + * <br> + * <b>Ports</b>:<br> + * <b> inport : </b> Input hash map(row) port, expects + * HashMap<String,Object><<br> + * <b> outport : </b> Output hash map(row) port, emits + * HashMap<String,Object><br> + * <br> + * <b> Properties : <b> <br> + * <b> condition : </b> Select condition for selecting rows. <br> + * <b> columns : </b> Column names/aggregate functions for select. <br> + * <br> + * @displayName Select + * @category Stream Manipulators + * @tags sql select operator, index, sql condition + * @since 0.3.3 + * @deprecated + */ +@Deprecated +public class SelectOperator extends BaseOperator +{ + + /** + * select columns/expression; + */ + private ArrayList<Index> indexes = new ArrayList<Index>(); + + /** + * condition. + */ + private Condition condition = null; + + /** + * add index. + */ + public void addIndex(Index index) + { + indexes.add(index); + } + + /** + * set condition. + */ + public void setCondition(Condition condition) + { + this.condition = condition; + } + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + + @Override + public void process(Map<String, Object> tuple) + { + if ((condition != null) && (!condition.isValidRow(tuple))) { + return; + } + if (indexes.size() == 0) { + outport.emit(tuple); + return; + } + Map<String, Object> result = new HashMap<String, Object>(); + for (int i = 0; i < indexes.size(); i++) { + indexes.get(i).filter(tuple, result); + } + outport.emit(result); + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java new file mode 100644 index 0000000..2880198 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperator.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.ArrayList; +import java.util.Map; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; + +/** + * An implementation of Operator that provides sql top select query semantic on live data stream. <br> + * <p> + * Stream rows passing condition are emitted on output port stream. <br> + * <br> + * <b>StateFull : NO,</b> all row data is processed in current time window. <br> + * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> + * <br> + * <b>Ports</b>:<br> + * <b> inport : </b> Input hash map(row) port, expects + * HashMap<String,Object><<br> + * <b> outport : </b> Output hash map(row) port, emits + * HashMap<String,Object><br> + * <br> + * <b> Properties : <b> <br> + * <b> topValue : </b> top values count. <br> + * <b> isPercentage : </b> top values count is percentage flag. + * <br> + * @displayName Select Top + * @category Stream Manipulators + * @tags sql select, sql top operator + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class SelectTopOperator implements Operator +{ + private ArrayList<Map<String, Object>> list; + private int topValue = 1; + private boolean isPercentage = false; + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + list.add(tuple); + } + }; + + @Override + public void setup(OperatorContext context) + { + // TODO Auto-generated method stub + + } + + @Override + public void teardown() + { + // TODO Auto-generated method stub + + } + + @Override + public void beginWindow(long windowId) + { + list = new ArrayList<Map<String, Object>>(); + } + + @Override + public void endWindow() + { + int numEmits = topValue; + if (isPercentage) { + numEmits = list.size() * (topValue / 100); + } + for (int i = 0; (i < numEmits) && (i < list.size()); i++) { + outport.emit(list.get(i)); + } + } + + public int getTopValue() + { + return topValue; + } + + public void setTopValue(int topValue) throws Exception + { + if (topValue <= 0) { + throw new Exception("Top value must be positive number."); + } + this.topValue = topValue; + } + + public boolean isPercentage() + { + return isPercentage; + } + + public void setPercentage(boolean isPercentage) + { + this.isPercentage = isPercentage; + } + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java new file mode 100644 index 0000000..52ddac6 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperator.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * An implementation of BaseOperator that provides sql update query semantic on live data stream. <br> + * <p> + * Stream rows passing condition are emitted on output port stream. <br> + * <br> + * <b>StateFull : NO,</b> all row data is processed in current time window. <br> + * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> + * <br> + * <b>Ports</b>:<br> + * <b> inport : </b> Input hash map(row) port, expects HashMap<String,Object><<br> + * <b> outport : </b> Output hash map(row) port, emits HashMap<String,Object><br> + * <br> + * <b> Properties : <b> <br> + * <b> condition : </b> Select condition for selecting rows. <br> + * <b> columns : </b> Column names/aggregate functions for select. <br> + * <br> + * @displayName Update + * @category Stream Manipulators + * @tags sql update operator, sql condition + * @since 0.3.3 + * @deprecated + */ +@Deprecated +public class UpdateOperator extends BaseOperator +{ + /** + * Update value map. + */ + Map<String, Object> updates = new HashMap<String, Object>(); + + /** + * condition. + */ + private Condition condition = null; + + /** + * set condition. + */ + public void setCondition(Condition condition) + { + this.condition = condition; + } + + /** + * Input port that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + if ((condition != null) && (!condition.isValidRow(tuple))) { + return; + } + if (updates.size() == 0) { + outport.emit(tuple); + return; + } + Map<String, Object> result = new HashMap<String, Object>(); + for (Map.Entry<String, Object> entry : tuple.entrySet()) { + if (updates.containsKey(entry.getKey())) { + result.put(entry.getKey(), updates.get(entry.getKey())); + } else { + result.put(entry.getKey(), entry.getValue()); + } + } + outport.emit(result); + } + }; + + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); + + /** + * Add update value. + */ + public void addUpdate(String name, Object value) + { + updates.put(name, value); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java new file mode 100644 index 0000000..155470c --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/BetweenCondition.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * A derivation of Condition that validates row by checking if the given column name value lies between given left,right range. <br> + * <p> + * <b>Properties : </b> <br> + * <b> column : </b> Name of column. <br> + * <b> leftValue : </b> left range of column value. <br> + * <b> rightValue : </b> right range od column value. <br> + * <br> + * @displayName Between Condition + * @category Stream Manipulators + * @tags sql condition + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class BetweenCondition extends Condition +{ + /** + * Column name to be checked. + */ + @NotNull + private String column; + + /** + * Left range value. + */ + @NotNull + private Object leftValue; + + /** + * Right range value. + */ + @NotNull + private Object rightValue; + + /** + * @param column Name of column, must be non null. <br> + * @param leftValue Left range for value, mut be non null. <br> + * @param rightValue right range for value, mut be non null. <br> + */ + public BetweenCondition(@NotNull String column, @NotNull Object leftValue, @NotNull Object rightValue) + { + this.column = column; + this.leftValue = leftValue; + this.rightValue = rightValue; + } + + /** + * Validate given row. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public boolean isValidRow(@NotNull Map<String, Object> row) + { + if (!row.containsKey(column)) { + return false; + } + Object value = row.get(column); + if (value == null) { + return false; + } + if (((Comparable)value).compareTo((Comparable)leftValue) < 0) { + return false; + } + if (((Comparable)value).compareTo((Comparable)rightValue) > 0) { + return false; + } + return true; + } + + /** + * Must not be called. + */ + @Override + public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, Object> row2) + { + assert (false); + return false; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java new file mode 100644 index 0000000..d606991 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/CompoundCondition.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * A derivation of Condition index that implements logical AND/OR select expression. <br> + * <p> + * Class provides logical OR or AND function specified in parameters. User can implement + * complex and/or expression by chaining operator itself. + * <br> + * <b> Properties : </b> <br> + * <b> leftCondition : </b> Left validate row condition . <br> + * <b> rightCondition : </b> Right validate row condition. <br> + * <b> logicalOr : </b> OR/AND logical metric flag. <br> + * <br> + * @displayName Compound Condition + * @category Stream Manipulators + * @tags sql condition, logical + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class CompoundCondition extends Condition +{ + /** + * Left validate row condition . + */ + @NotNull + private Condition leftCondition; + + /** + * Right validate row condition . + */ + @NotNull + private Condition rightCondition; + + /** + * AND/OR metric flag. + */ + private boolean logicalOr = true; + + /** + * Constructor for logical or metric. + * + * @param leftCondition Left validate row condition, must be non null. <br> + * @param rightCondition Right validate row condition, must be non null. <br> + */ + public CompoundCondition(Condition leftCondition, Condition rightCondition) + { + this.leftCondition = leftCondition; + this.rightCondition = rightCondition; + } + + /** + * Constructor for logical and metric if logical and parameter is true. + * <br> + * + * @param leftCondition Left validate row condition, must be non null. <br> + * @param rightCondition Right validate row condition, must be non null. <br> + * @param isLogicalAnd Logical AND if true. + */ + public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd) + { + this.leftCondition = leftCondition; + this.rightCondition = rightCondition; + logicalOr = !isLogicalAnd; + } + + @Override + public boolean isValidRow(Map<String, Object> row) + { + if (logicalOr) { + return leftCondition.isValidRow(row) || rightCondition.isValidRow(row); + } else { + return leftCondition.isValidRow(row) && rightCondition.isValidRow(row); + } + } + + @Override + public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) + { + // TODO Auto-generated method stub + return false; + } + + public Condition getLeftCondition() + { + return leftCondition; + } + + public void setLeftCondition(Condition leftCondition) + { + this.leftCondition = leftCondition; + } + + public Condition getRightCondition() + { + return rightCondition; + } + + public void setRightCondition(Condition rightCondition) + { + this.rightCondition = rightCondition; + } + + public void setLogicalAnd() + { + this.logicalOr = false; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java new file mode 100644 index 0000000..a54960d --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/EqualValueCondition.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * An implementation of condition on column equality. + * <p> + * A valid row must have all key/value map in column name/value map. + * + * <b> Properties : </b> <br> + * <b> equalMap : </b> Column equal value map store. + * @displayName Equal Value Condition + * @category Stream Manipulators + * @tags sql condition + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class EqualValueCondition extends Condition +{ + + /** + * Equal column map. + */ + private HashMap<String, Object> equalMap = new HashMap<String, Object>(); + + /** + * Add column equal condition. + */ + public void addEqualValue(String column, Object value) + { + equalMap.put(column, value); + } + + /** + * Check valid row. + */ + @Override + public boolean isValidRow(Map<String, Object> row) + { + // no conditions + if (equalMap.size() == 0) { + return true; + } + + // compare each condition value + for (Map.Entry<String, Object> entry : equalMap.entrySet()) { + if (!row.containsKey(entry.getKey())) { + return false; + } + Object value = row.get(entry.getKey()); + if (entry.getValue() == null) { + if (value == null) { + return true; + } + return false; + } + if (value == null) { + return false; + } + if (!entry.getValue().equals(value)) { + return false; + } + } + return true; + } + + /** + * check valid join, not implemented + * + * @return false + */ + @Override + public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java new file mode 100644 index 0000000..6e0400b --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCompareValue.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex; + +/** + * A derivation of HavingCondition that implements comparison of aggregate index value to input compare value. <br> + * <p> + * Compare value must implement interface Comparable. <br> + * <br> + * <b> Properties : </b> + * <b> compareValue : </b> Value to be compared. <br> + * <b> compareType : </b> Type of comparison -1 == lt, 0 == eq, 1 == gt. <br> + * @displayName Having Compare Value + * @category Stream Manipulators + * @tags compare, sql condition + * @since 0.3.4 + * @deprecated + */ +@Deprecated +@SuppressWarnings("rawtypes") +public class HavingCompareValue<T extends Comparable> extends HavingCondition +{ + /** + * Value to be compared. + */ + private T compareValue; + + /** + * Type of comparison -1 == lt, 0 == eq, 1 == gt. + */ + private int compareType; + + /** + * @param aggregateIndex aggregate index for comparison. <br> + * @param compareValue Value to be compared. <br> + * @param compareType Type of comparison -1 == lt, 0 == eq, 1 == gt. <br> + */ + public HavingCompareValue(FunctionIndex aggregateIndex, T compareValue, int compareType) + { + super(aggregateIndex); + this.compareValue = compareValue; + this.compareType = compareType; + } + + /** + * Validate aggregate override. <br> + */ + @SuppressWarnings("unchecked") + @Override + public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception + { + Object computed = aggregateIndex.compute(rows); + return (compareType == compareValue.compareTo(computed)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java new file mode 100644 index 0000000..66ef35c --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/HavingCondition.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex; + +/** + * A base class for Group,Having operator with aggregate index constraint.&nsbsp; Subclasses should provide the + implementation to check if aggregate is valid. + * <p> + * @displayName Having Condition + * @category Stream Manipulators + * @tags sql condition, index, group + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public abstract class HavingCondition +{ + /** + * Aggregate index to be validated. + */ + protected FunctionIndex aggregateIndex = null; + + /** + * @param aggregateIndex Aggregate index to be validated. + */ + public HavingCondition(FunctionIndex aggregateIndex) + { + this.aggregateIndex = aggregateIndex; + } + + /** + * Check if aggregate is valid. + */ + public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java new file mode 100644 index 0000000..d19bb99 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/InCondition.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * An implementation of condition class to check if a column value is in a given set of values. + * <p> + * <br> + * <b>Properties : </b> <br> + * <b> column : </b> Column name for which value is checked in values set. <br> + * <b> inValues : </b> Set of values in which column value is checked. <br> + * @displayName In Condition + * @category Stream Manipulators + * @tags sql condition + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class InCondition extends Condition +{ + /** + * Column name for which value is checked in values set. + */ + @NotNull + private String column; + + /** + * Set of values in which column value is checked. + */ + private Set<Object> inValues = new HashSet<Object>(); + + /** + * @param column Column name for which value is checked in values set. + */ + public InCondition(@NotNull String column) + { + this.column = column; + } + + @Override + public boolean isValidRow(@NotNull Map<String, Object> row) + { + if (!row.containsKey(column)) { + return false; + } + return inValues.contains(row.get(column)); + } + + @Override + public boolean isValidJoin(@NotNull Map<String, Object> row1, @NotNull Map<String, Object> row2) + { + return false; + } + + public String getColumn() + { + return column; + } + + public void setColumn(String column) + { + this.column = column; + } + + public void addInValue(Object value) + { + this.inValues.add(value); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java new file mode 100644 index 0000000..a8789fa --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/condition/LikeCondition.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.condition; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.condition.Condition; + +/** + * An implementation of condition class to filter rows for which given column name value matches given regular expression. <br> + *<p> + *<b> Properties : </b> <br> + *<b> column : < /b> Column to be matched with regular expression. <br> + *<b> pattern : </b> Regular expression pattern.<br> + * @displayName Like Condition + * @category Stream Manipulators + * @tags sql, like condition, regular expression + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class LikeCondition extends Condition +{ + /** + * Column to be matched with regular expression. + */ + @NotNull + private String column; + + /** + * Regular expression pattern. + */ + @NotNull + private Pattern pattern; + + /** + * @param column Column to be matched with regular expression, must be non-null. + * @param pattern Regular expression pattern, must be non-null. + */ + public LikeCondition(@NotNull String column,@NotNull String pattern) + { + setColumn(column); + setPattern(pattern); + } + + /** + * For valid row column value string must match regular expression. + * @return row valid status. + */ + @Override + public boolean isValidRow(Map<String, Object> row) + { + if (!row.containsKey(column)) { + return false; + } + Matcher match = pattern.matcher((CharSequence)row.get(column)); + return match.find(); + } + + /** + * Must not be called. + */ + @Override + public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) + { + assert (false); + return false; + } + + public String getColumn() + { + return column; + } + + public void setColumn(String column) + { + this.column = column; + } + + public void setPattern(String pattern) + { + this.pattern = Pattern.compile(pattern); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java new file mode 100644 index 0000000..72ac59f --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/AverageFunction.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang.StringUtils; + +/** + * An implementation of function index that implements average function semantics. <br> + * <p> + * e.g : sql => SELECT AVG(column_name) FROM table_name. <br> + * <br> + * <b> Properties : </b> <br> + * <b> column : </b> Aggregate over given column values. <br> + * <b> alias : </b> Alias name for aggregate output. <br> + * @displayName Average Function + * @category Stream Manipulators + * @tags sql average + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class AverageFunction extends FunctionIndex +{ + /** + * @param column Aggregate over given column values, must be non null. + * @param alias Alias name for aggregate output. + */ + public AverageFunction(@NotNull String column, String alias) + { + super(column, alias); + } + + /** + * Compute average for given column values. + */ + @Override + public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception + { + if (rows.size() == 0) { + return 0.0; + } + double sum = 0.0; + for (Map<String, Object> row : rows) { + sum += ((Number)row.get(column)).doubleValue(); + } + return sum / rows.size(); + } + + /** + * Get aggregate name. + * @return name. + */ + @Override + protected String aggregateName() + { + if (!StringUtils.isEmpty(alias)) { + return alias; + } + return "AVG(" + column + ")"; + } +}
