http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java deleted file mode 100644 index 18d9928..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.Unifier; - -/** - * An implementation of Operator that provides sql order by operator semantic over live stream data. <br> - * <p> - * Input data rows are ordered by order rules, ordered result is emitted on output port. <br> - * <br> - * * <br> - * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> - * <b>Partitions : Yes, </b> This operator is also unifier on output port. <br> - * <br> - * <b>Ports</b>:<br> - * <b> inport : </b> Input hash map(row) port, expects HashMap<String,Object><<br> - * <b> outport : </b> Output hash map(row) port, emits HashMap<String,Object><br> - * <br> - * <b> Properties : </b> <br> - * <b> orderByRules : </b>List of order by rules for tuples. - * @displayName OrderBy - * @category Stream Manipulators - * @tags orderby operator - * @since 0.3.5 - */ -public class OrderByOperator implements Operator, Unifier<Map<String, Object>> -{ - /** - * Order by rules. - */ - ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>(); - - /** - * Descending flag. - */ - private boolean isDescending; - - /** - * collected rows. - */ - private ArrayList<Map<String, Object>> rows; - - /** - * Add order by rule. - */ - public void addOrderByRule(OrderByRule<?> rule) - { - orderByRules.add(rule); - } - - /** - * @return isDescending - */ - public boolean isDescending() - { - return isDescending; - } - - /** - * @param isDescending isDescending - */ - public void setDescending(boolean isDescending) - { - this.isDescending = isDescending; - } - - @Override - public void process(Map<String, Object> tuple) - { - rows.add(tuple); - } - - @Override - public void beginWindow(long arg0) - { - rows = new ArrayList<Map<String, Object>>(); - } - - @Override - public void endWindow() - { - for (int i = 0; i < orderByRules.size(); i++) { - rows = orderByRules.get(i).sort(rows); - } - if (isDescending) { - for (int i = 0; i < rows.size(); i++) { - outport.emit(rows.get(i)); - } - } else { - for (int i = rows.size() - 1; i >= 0; i--) { - outport.emit(rows.get(i)); - } - } - } - - @Override - public void setup(OperatorContext arg0) - { - // TODO Auto-generated method stub - - } - - @Override - public void teardown() - { - // TODO Auto-generated method stub - - } - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - @Override - public void process(Map<String, Object> tuple) - { - rows.add(tuple); - } - }; - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>() - { - @Override - public Unifier<Map<String, Object>> getUnifier() - { - OrderByOperator unifier = new OrderByOperator(); - for (int i = 0; i < getOrderByRules().size(); i++) { - unifier.addOrderByRule(getOrderByRules().get(i)); - } - unifier.setDescending(isDescending); - return unifier; - } - }; - - /** - * @return the orderByRules - */ - public ArrayList<OrderByRule<?>> getOrderByRules() - { - return orderByRules; - } - - /** - * The order by rules used to order incoming tuples. - * @param oredrByRules the orderByRules to set - */ - public void setOrderByRules(ArrayList<OrderByRule<?>> oredrByRules) - { - this.orderByRules = oredrByRules; - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java deleted file mode 100644 index 8573903..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.Map; -import java.util.TreeMap; - -/** - * Implements order by key name rule. <br> - * <p> - * <b>Properties : </b> <br> - * <b> columnName : </b> Name of column for ordering tuples. <br> - * @displayName OrderBy Rule - * @category Stream Manipulators - * @tags orderby, sort, comparison - * @since 0.3.3 - */ -@SuppressWarnings("rawtypes") -public class OrderByRule<T extends Comparable> -{ - - /** - * column name for ordering tuples. - */ - private String columnName; - - public OrderByRule(String name) - { - - columnName = name; - } - - /** - * sort rows by each rule and emit result on output port. - */ - @SuppressWarnings("unchecked") - public ArrayList<Map<String, Object>> sort(ArrayList<Map<String, Object>> rows) - { - - TreeMap<T, ArrayList<Map<String, Object>>> sorted = new TreeMap<T, ArrayList<Map<String, Object>>>(); - for (int i = 0; i < rows.size(); i++) { - Map<String, Object> row = rows.get(i); - if (row.containsKey(columnName)) { - T value = (T)row.get(columnName); - ArrayList<Map<String, Object>> list; - if (sorted.containsKey(value)) { - list = sorted.get(value); - } else { - list = new ArrayList<Map<String, Object>>(); - sorted.put(value, list); - } - list.add(row); - } - } - ArrayList<Map<String, Object>> result = new ArrayList<Map<String, Object>>(); - for (Map.Entry<T, ArrayList<Map<String, Object>>> entry : sorted.entrySet()) { - result.addAll(entry.getValue()); - } - return result; - } - - /** - * @return the columnName - */ - public String getColumnName() - { - - return columnName; - } - - /** - * @param columnName - * the columnName to set - */ - public void setColumnName(String columnName) - { - - this.columnName = columnName; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java deleted file mode 100644 index 0494bfb..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -/** - * An operator that provides sql left,right and full outer join metric semantics on live stream. <br> - * <p> - * Please refer to {@link com.datatorrent.lib.streamquery.InnerJoinOperator} for - * details. - * - * <b> Properties : </b> <br> - * <b> isLeftJoin : </b> Left join flag. <br> - * <b> isFullJoin : </b> Full join flag. <br> - * @displayName Outer Join - * @category Stream Manipulators - * @tags sql, outer join operator - * @since 0.3.4 - */ -public class OuterJoinOperator extends InnerJoinOperator -{ - - private boolean isLeftJoin = true; - private boolean isFullJoin = false; - - @Override - public void endWindow() - { - // full outer join - if (isFullJoin) { - for (int i = 0; i < table1.size(); i++) { - boolean merged = false; - for (int j = 0; j < table2.size(); j++) { - if ((joinCondition == null) - || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) { - merged = true; - } - } - if (!merged) { - joinRows(table1.get(i), null); - } - } - for (int i = 0; i < table2.size(); i++) { - boolean merged = false; - for (int j = 0; j < table1.size(); j++) { - if ((joinCondition == null) - || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) { - merged = true; - } - } - if (!merged) { // only output non merged rows - joinRows(null, table2.get(i)); - } - } - return; - } - - // left or right join - if (isLeftJoin) { - for (int i = 0; i < table1.size(); i++) { - boolean merged = false; - for (int j = 0; j < table2.size(); j++) { - if ((joinCondition == null) - || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) { - merged = true; - } - } - if (!merged) { - joinRows(table1.get(i), null); - } - } - } else { - for (int i = 0; i < table2.size(); i++) { - boolean merged = false; - for (int j = 0; j < table1.size(); j++) { - if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) { - merged = true; - } - } - if (!merged) { // only output non merged rows - joinRows(null, table2.get(i)); - } - } - } - } - - public void setLeftJoin() - { - isLeftJoin = true; - } - - public void setRighttJoin() - { - isLeftJoin = false; - } - - public boolean isFullJoin() - { - return isFullJoin; - } - - public void setFullJoin(boolean isFullJoin) - { - this.isFullJoin = isFullJoin; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java deleted file mode 100644 index 77616f3..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.lib.streamquery.function.FunctionIndex; - -/** - * An implementation of Operator that applies sql top or limit semantics on incoming tuple(s). <br> - * <p> - * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> - * <b>Partitions : No, </b> will yield wrong result(s). <br> - * <br> - * <b>Ports : </b> <br> - * <b>inport : </b> expect tuple for type T. <br> - * <b>outport : </b> emits tuple for type T. <br> - * <br> - * <b> Properties : </b> <br> - * <b> functions : </b> Sql function for rows. <br> - * @displayName Select Function - * @category Stream Manipulators - * @tags sql top, sql limit, sql select operator - * @since 0.3.4 - */ -@OperatorAnnotation(partitionable = false) -public class SelectFunctionOperator implements Operator -{ - /** - * array of rows. - */ - private ArrayList<Map<String, Object>> rows; - - /** - * Aggregate function for rows. - */ - private ArrayList<FunctionIndex> functions = new ArrayList<FunctionIndex>(); - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - - @Override - public void process(Map<String, Object> row) - { - rows.add(row); - } - }; - - @Override - public void setup(OperatorContext context) - { - // TODO Auto-generated method stub - - } - - @Override - public void teardown() - { - // TODO Auto-generated method stub - - } - - @Override - public void beginWindow(long windowId) - { - rows = new ArrayList<Map<String, Object>>(); - } - - @Override - public void endWindow() - { - if (functions.size() == 0) { - return; - } - Map<String, Object> collect = new HashMap<String, Object>(); - for (FunctionIndex function : functions) { - try { - function.filter(rows, collect); - } catch (Exception e) { - e.printStackTrace(); - return; - } - } - outport.emit(collect); - } - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); - - /** - * Add sql function. - * @param function Sql function for rows. - */ - public void addSqlFunction(FunctionIndex function) - { - functions.add(function); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java deleted file mode 100644 index 4dbc1f0..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.index.Index; - -/** - * An implementation of that provides sql select query semantics on live data stream. <br> - * <p> - * Stream rows passing condition are emitted on output port stream. <br> - * <br> - * <b>StateFull : NO,</b> all row data is processed in current time window. <br> - * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> - * <br> - * <b>Ports</b>:<br> - * <b> inport : </b> Input hash map(row) port, expects - * HashMap<String,Object><<br> - * <b> outport : </b> Output hash map(row) port, emits - * HashMap<String,Object><br> - * <br> - * <b> Properties : <b> <br> - * <b> condition : </b> Select condition for selecting rows. <br> - * <b> columns : </b> Column names/aggregate functions for select. <br> - * <br> - * @displayName Select - * @category Stream Manipulators - * @tags sql select operator, index, sql condition - * @since 0.3.3 - */ -public class SelectOperator extends BaseOperator -{ - - /** - * select columns/expression; - */ - private ArrayList<Index> indexes = new ArrayList<Index>(); - - /** - * condition. - */ - private Condition condition = null; - - /** - * add index. - */ - public void addIndex(Index index) - { - indexes.add(index); - } - - /** - * set condition. - */ - public void setCondition(Condition condition) - { - this.condition = condition; - } - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - - @Override - public void process(Map<String, Object> tuple) - { - if ((condition != null) && (!condition.isValidRow(tuple))) { - return; - } - if (indexes.size() == 0) { - outport.emit(tuple); - return; - } - Map<String, Object> result = new HashMap<String, Object>(); - for (int i = 0; i < indexes.size(); i++) { - indexes.get(i).filter(tuple, result); - } - outport.emit(result); - } - }; - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java deleted file mode 100644 index c3ae083..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; - -/** - * An implementation of Operator that provides sql top select query semantic on live data stream. <br> - * <p> - * Stream rows passing condition are emitted on output port stream. <br> - * <br> - * <b>StateFull : NO,</b> all row data is processed in current time window. <br> - * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> - * <br> - * <b>Ports</b>:<br> - * <b> inport : </b> Input hash map(row) port, expects - * HashMap<String,Object><<br> - * <b> outport : </b> Output hash map(row) port, emits - * HashMap<String,Object><br> - * <br> - * <b> Properties : <b> <br> - * <b> topValue : </b> top values count. <br> - * <b> isPercentage : </b> top values count is percentage flag. - * <br> - * @displayName Select Top - * @category Stream Manipulators - * @tags sql select, sql top operator - * @since 0.3.4 - */ -public class SelectTopOperator implements Operator -{ - private ArrayList<Map<String, Object>> list; - private int topValue = 1; - private boolean isPercentage = false; - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - @Override - public void process(Map<String, Object> tuple) - { - list.add(tuple); - } - }; - - @Override - public void setup(OperatorContext context) - { - // TODO Auto-generated method stub - - } - - @Override - public void teardown() - { - // TODO Auto-generated method stub - - } - - @Override - public void beginWindow(long windowId) - { - list = new ArrayList<Map<String, Object>>(); - } - - @Override - public void endWindow() - { - int numEmits = topValue; - if (isPercentage) { - numEmits = list.size() * (topValue / 100); - } - for (int i = 0; (i < numEmits) && (i < list.size()); i++) { - outport.emit(list.get(i)); - } - } - - public int getTopValue() - { - return topValue; - } - - public void setTopValue(int topValue) throws Exception - { - if (topValue <= 0) { - throw new Exception("Top value must be positive number."); - } - this.topValue = topValue; - } - - public boolean isPercentage() - { - return isPercentage; - } - - public void setPercentage(boolean isPercentage) - { - this.isPercentage = isPercentage; - } - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java deleted file mode 100644 index 6724a7e..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.streamquery.condition.Condition; - -/** - * An implementation of BaseOperator that provides sql update query semantic on live data stream. <br> - * <p> - * Stream rows passing condition are emitted on output port stream. <br> - * <br> - * <b>StateFull : NO,</b> all row data is processed in current time window. <br> - * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> - * <br> - * <b>Ports</b>:<br> - * <b> inport : </b> Input hash map(row) port, expects HashMap<String,Object><<br> - * <b> outport : </b> Output hash map(row) port, emits HashMap<String,Object><br> - * <br> - * <b> Properties : <b> <br> - * <b> condition : </b> Select condition for selecting rows. <br> - * <b> columns : </b> Column names/aggregate functions for select. <br> - * <br> - * @displayName Update - * @category Stream Manipulators - * @tags sql update operator, sql condition - * @since 0.3.3 - */ -public class UpdateOperator extends BaseOperator -{ - /** - * Update value map. - */ - Map<String, Object> updates = new HashMap<String, Object>(); - - /** - * condition. - */ - private Condition condition = null; - - /** - * set condition. - */ - public void setCondition(Condition condition) - { - this.condition = condition; - } - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - @Override - public void process(Map<String, Object> tuple) - { - if ((condition != null) && (!condition.isValidRow(tuple))) { - return; - } - if (updates.size() == 0) { - outport.emit(tuple); - return; - } - Map<String, Object> result = new HashMap<String, Object>(); - for (Map.Entry<String, Object> entry : tuple.entrySet()) { - if (updates.containsKey(entry.getKey())) { - result.put(entry.getKey(), updates.get(entry.getKey())); - } else { - result.put(entry.getKey(), entry.getValue()); - } - } - outport.emit(result); - } - }; - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); - - /** - * Add update value. - */ - public void addUpdate(String name, Object value) - { - updates.put(name, value); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java deleted file mode 100644 index 43cdc72..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * A derivation of Condition that validates row by checking if the given column name value lies between given left,right range. <br> - * <p> - * <b>Properties : </b> <br> - * <b> column : </b> Name of column. <br> - * <b> leftValue : </b> left range of column value. <br> - * <b> rightValue : </b> right range od column value. <br> - * <br> - * @displayName Between Condition - * @category Stream Manipulators - * @tags sql condition - * @since 0.3.4 - */ -public class BetweenCondition extends Condition -{ - /** - * Column name to be checked. - */ - @NotNull - private String column; - - /** - * Left range value. - */ - @NotNull - private Object leftValue; - - /** - * Right range value. - */ - @NotNull - private Object rightValue; - - /** - * @param column Name of column, must be non null. <br> - * @param leftValue Left range for value, mut be non null. <br> - * @param rightValue right range for value, mut be non null. <br> - */ - public BetweenCondition(@NotNull String column, @NotNull Object leftValue, @NotNull Object rightValue) - { - this.column = column; - this.leftValue = leftValue; - this.rightValue = rightValue; - } - - /** - * Validate given row. - */ - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public boolean isValidRow(@NotNull Map<String, Object> row) - { - if (!row.containsKey(column)) { - return false; - } - Object value = row.get(column); - if (value == null) { - return false; - } - if (((Comparable)value).compareTo((Comparable)leftValue) < 0) { - return false; - } - if (((Comparable)value).compareTo((Comparable)rightValue) > 0) { - return false; - } - return true; - } - - /** - * Must not be called. - */ - @Override - public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, Object> row2) - { - assert (false); - return false; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java deleted file mode 100644 index b4bd3ed..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * A derivation of Condition index that implements logical AND/OR select expression. <br> - * <p> - * Class provides logical OR or AND function specified in parameters. User can implement - * complex and/or expression by chaining operator itself. - * <br> - * <b> Properties : </b> <br> - * <b> leftCondition : </b> Left validate row condition . <br> - * <b> rightCondition : </b> Right validate row condition. <br> - * <b> logicalOr : </b> OR/AND logical metric flag. <br> - * <br> - * @displayName Compound Condition - * @category Stream Manipulators - * @tags sql condition, logical - * @since 0.3.4 - */ -public class CompoundCondition extends Condition -{ - /** - * Left validate row condition . - */ - @NotNull - private Condition leftCondition; - - /** - * Right validate row condition . - */ - @NotNull - private Condition rightCondition; - - /** - * AND/OR metric flag. - */ - private boolean logicalOr = true; - - /** - * Constructor for logical or metric. - * - * @param leftCondition Left validate row condition, must be non null. <br> - * @param rightCondition Right validate row condition, must be non null. <br> - */ - public CompoundCondition(Condition leftCondition, Condition rightCondition) - { - this.leftCondition = leftCondition; - this.rightCondition = rightCondition; - } - - /** - * Constructor for logical and metric if logical and parameter is true. - * <br> - * - * @param leftCondition Left validate row condition, must be non null. <br> - * @param rightCondition Right validate row condition, must be non null. <br> - * @param isLogicalAnd Logical AND if true. - */ - public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd) - { - this.leftCondition = leftCondition; - this.rightCondition = rightCondition; - logicalOr = !isLogicalAnd; - } - - @Override - public boolean isValidRow(Map<String, Object> row) - { - if (logicalOr) { - return leftCondition.isValidRow(row) || rightCondition.isValidRow(row); - } else { - return leftCondition.isValidRow(row) && rightCondition.isValidRow(row); - } - } - - @Override - public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) - { - // TODO Auto-generated method stub - return false; - } - - public Condition getLeftCondition() - { - return leftCondition; - } - - public void setLeftCondition(Condition leftCondition) - { - this.leftCondition = leftCondition; - } - - public Condition getRightCondition() - { - return rightCondition; - } - - public void setRightCondition(Condition rightCondition) - { - this.rightCondition = rightCondition; - } - - public void setLogicalAnd() - { - this.logicalOr = false; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java deleted file mode 100644 index bb478cf..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.HashMap; -import java.util.Map; - - -/** - * An implementation of condition on column equality. - * <p> - * A valid row must have all key/value map in column name/value map. - * - * <b> Properties : </b> <br> - * <b> equalMap : </b> Column equal value map store. - * @displayName Equal Value Condition - * @category Stream Manipulators - * @tags sql condition - * @since 0.3.4 - */ -public class EqualValueCondition extends Condition -{ - - /** - * Equal column map. - */ - private HashMap<String, Object> equalMap = new HashMap<String, Object>(); - - /** - * Add column equal condition. - */ - public void addEqualValue(String column, Object value) - { - equalMap.put(column, value); - } - - /** - * Check valid row. - */ - @Override - public boolean isValidRow(Map<String, Object> row) - { - // no conditions - if (equalMap.size() == 0) { - return true; - } - - // compare each condition value - for (Map.Entry<String, Object> entry : equalMap.entrySet()) { - if (!row.containsKey(entry.getKey())) { - return false; - } - Object value = row.get(entry.getKey()); - if (entry.getValue() == null) { - if (value == null) { - return true; - } - return false; - } - if (value == null) { - return false; - } - if (!entry.getValue().equals(value)) { - return false; - } - } - return true; - } - - /** - * check valid join, not implemented - * - * @return false - */ - @Override - public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) - { - return false; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java deleted file mode 100644 index 7877053..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.lib.streamquery.function.FunctionIndex; - -/** - * A derivation of HavingCondition that implements comparison of aggregate index value to input compare value. <br> - * <p> - * Compare value must implement interface Comparable. <br> - * <br> - * <b> Properties : </b> - * <b> compareValue : </b> Value to be compared. <br> - * <b> compareType : </b> Type of comparison -1 == lt, 0 == eq, 1 == gt. <br> - * @displayName Having Compare Value - * @category Stream Manipulators - * @tags compare, sql condition - * @since 0.3.4 - */ -@SuppressWarnings("rawtypes") -public class HavingCompareValue<T extends Comparable> extends HavingCondition -{ - /** - * Value to be compared. - */ - private T compareValue; - - /** - * Type of comparison -1 == lt, 0 == eq, 1 == gt. - */ - private int compareType; - - /** - * @param aggregateIndex aggregate index for comparison. <br> - * @param compareValue Value to be compared. <br> - * @param compareType Type of comparison -1 == lt, 0 == eq, 1 == gt. <br> - */ - public HavingCompareValue(FunctionIndex aggregateIndex, T compareValue, int compareType) - { - super(aggregateIndex); - this.compareValue = compareValue; - this.compareType = compareType; - } - - /** - * Validate aggregate override. <br> - */ - @SuppressWarnings("unchecked") - @Override - public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception - { - Object computed = aggregateIndex.compute(rows); - return (compareType == compareValue.compareTo(computed)); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java deleted file mode 100644 index 6dac690..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.lib.streamquery.function.FunctionIndex; - -/** - * A base class for Group,Having operator with aggregate index constraint.&nsbsp; Subclasses should provide the - implementation to check if aggregate is valid. - * <p> - * @displayName Having Condition - * @category Stream Manipulators - * @tags sql condition, index, group - * @since 0.3.4 - */ -public abstract class HavingCondition -{ - /** - * Aggregate index to be validated. - */ - protected FunctionIndex aggregateIndex = null; - - /** - * @param aggregateIndex Aggregate index to be validated. - */ - public HavingCondition(FunctionIndex aggregateIndex) - { - this.aggregateIndex = aggregateIndex; - } - - /** - * Check if aggregate is valid. - */ - public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java deleted file mode 100644 index 236f3b1..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import javax.validation.constraints.NotNull; - -/** - * An implementation of condition class to check if a column value is in a given set of values. - * <p> - * <br> - * <b>Properties : </b> <br> - * <b> column : </b> Column name for which value is checked in values set. <br> - * <b> inValues : </b> Set of values in which column value is checked. <br> - * @displayName In Condition - * @category Stream Manipulators - * @tags sql condition - * @since 0.3.4 - */ -public class InCondition extends Condition -{ - /** - * Column name for which value is checked in values set. - */ - @NotNull - private String column; - - /** - * Set of values in which column value is checked. - */ - private Set<Object> inValues = new HashSet<Object>(); - - /** - * @param column Column name for which value is checked in values set. - */ - public InCondition(@NotNull String column) - { - this.column = column; - } - - @Override - public boolean isValidRow(@NotNull Map<String, Object> row) - { - if (!row.containsKey(column)) { - return false; - } - return inValues.contains(row.get(column)); - } - - @Override - public boolean isValidJoin(@NotNull Map<String, Object> row1, @NotNull Map<String, Object> row2) - { - return false; - } - - public String getColumn() - { - return column; - } - - public void setColumn(String column) - { - this.column = column; - } - - public void addInValue(Object value) - { - this.inValues.add(value); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java index d350edc..4d8199e 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java @@ -23,7 +23,6 @@ import java.util.Map; import javax.validation.constraints.NotNull; - /** * An implementation of equal join condition class. * <p> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java deleted file mode 100644 index b3d7174..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.condition; - -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.validation.constraints.NotNull; - - -/** - * An implementation of condition class to filter rows for which given column name value matches given regular expression. <br> - *<p> - *<b> Properties : </b> <br> - *<b> column : < /b> Column to be matched with regular expression. <br> - *<b> pattern : </b> Regular expression pattern.<br> - * @displayName Like Condition - * @category Stream Manipulators - * @tags sql, like condition, regular expression - * @since 0.3.4 - */ -public class LikeCondition extends Condition -{ - /** - * Column to be matched with regular expression. - */ - @NotNull - private String column; - - /** - * Regular expression pattern. - */ - @NotNull - private Pattern pattern; - - /** - * @param column Column to be matched with regular expression, must be non-null. - * @param pattern Regular expression pattern, must be non-null. - */ - public LikeCondition(@NotNull String column,@NotNull String pattern) - { - setColumn(column); - setPattern(pattern); - } - - /** - * For valid row column value string must match regular expression. - * @return row valid status. - */ - @Override - public boolean isValidRow(Map<String, Object> row) - { - if (!row.containsKey(column)) { - return false; - } - Matcher match = pattern.matcher((CharSequence)row.get(column)); - return match.find(); - } - - /** - * Must not be called. - */ - @Override - public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) - { - assert (false); - return false; - } - - public String getColumn() - { - return column; - } - - public void setColumn(String column) - { - this.column = column; - } - - public void setPattern(String pattern) - { - this.pattern = Pattern.compile(pattern); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java deleted file mode 100644 index e212ff8..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.function; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang.StringUtils; - -/** - * An implementation of function index that implements average function semantics. <br> - * <p> - * e.g : sql => SELECT AVG(column_name) FROM table_name. <br> - * <br> - * <b> Properties : </b> <br> - * <b> column : </b> Aggregate over given column values. <br> - * <b> alias : </b> Alias name for aggregate output. <br> - * @displayName Average Function - * @category Stream Manipulators - * @tags sql average - * @since 0.3.4 - */ -public class AverageFunction extends FunctionIndex -{ - /** - * @param column Aggregate over given column values, must be non null. - * @param alias Alias name for aggregate output. - */ - public AverageFunction(@NotNull String column, String alias) - { - super(column, alias); - } - - /** - * Compute average for given column values. - */ - @Override - public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception - { - if (rows.size() == 0) { - return 0.0; - } - double sum = 0.0; - for (Map<String, Object> row : rows) { - sum += ((Number)row.get(column)).doubleValue(); - } - return sum / rows.size(); - } - - /** - * Get aggregate name. - * @return name. - */ - @Override - protected String aggregateName() - { - if (!StringUtils.isEmpty(alias)) { - return alias; - } - return "AVG(" + column + ")"; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java deleted file mode 100644 index dafe54e..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.function; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang.StringUtils; - -/** - * An implementation of function index that implements sql count function semantic. <br> - * <p> - * Counts number of values of given column and returns count of non null values in column. - * e.g : sql => SELECT COUNT(column_name) FROM table_name. <br> - * <br> - * <b> Properties : </b> <br> - * <b> column : </b> column name for values count. <br> - * <b> alias : </b> Alias name for aggregate output. <br> - * @displayName Count Function - * @category Stream Manipulators - * @tags sql count - * @since 0.3.4 - */ -public class CountFunction extends FunctionIndex -{ - /** - * @param column column for values count, must be non null. - * @param alias Alias name for aggregate output. - */ - public CountFunction(@NotNull String column, String alias) - { - super(column, alias); - } - - /** - * Count number of values of given column. - * @return Count of non null values in column. - */ - @Override - public Object compute(ArrayList<Map<String, Object>> rows) throws Exception - { - if (column.equals("*")) { - return rows.size(); - } - long count = 0; - for (Map<String, Object> row : rows) { - if (row.containsKey(column) && (row.get(column) != null)) { - count++; - } - } - return count; - } - - /** - * Aggregate output name. - * @return name string. - */ - @Override - protected String aggregateName() - { - if (!StringUtils.isEmpty(alias)) { - return alias; - } - return "COUNT(" + column + ")"; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java deleted file mode 100644 index db2c2b7..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.function; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang.StringUtils; - -/** - * An implementation of function index that implements sql first,last function semantic. <br> - * <p> - * e.g : sql => SELECT FIRST/LAST(column_name) FROM table_name. <br> - * <br> - * <b> Properties : </b> <br> - * <b> column : </b> column name for first/last value. <br> - * <b> alias : </b> Alias name for output. <br> - * <b> isFirst : </b> return first value if true. - * @displayName First Last Function - * @category Stream Manipulators - * @tags sql first, sql last - * @since 0.3.4 - */ -public class FirstLastFunction extends FunctionIndex -{ - /** - * return first value if true. - */ - private boolean isFirst; - - /** - * @param column column name for first/last value. - * @param alias Alias name for output. - * @param isFirst return first value if true. - */ - public FirstLastFunction(@NotNull String column, String alias, boolean isLast) - { - super(column, alias); - isFirst = !isLast; - } - - /** - * Get first/last non null value for column. - */ - @Override - public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception - { - if (rows.size() == 0) { - return null; - } - if (isFirst) { - for (int i = 0; i < rows.size(); i++) { - if (rows.get(i).get(column) != null) { - return rows.get(i).get(column); - } - } - } else { - for (int i = (rows.size() - 1); i >= 0; i--) { - if (rows.get(i).get(column) != null) { - return rows.get(i).get(column); - } - } - } - return null; - } - - /** - * Aggregate output name. - * @return name string. - */ - @Override - protected String aggregateName() - { - if (!StringUtils.isEmpty(alias)) { - return alias; - } - if (isFirst) { - return "FIRST(" + column + ")"; - } - return "LAST(" + column + ")"; - } - - public boolean isFirst() - { - return isFirst; - } - - public void setFirst(boolean isFirst) - { - this.isFirst = isFirst; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java deleted file mode 100644 index 918ca89..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.function; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * A base class for select aggregate function implementation. Subclasses should provide the - implementation for aggregate compute functions. - * <p> - * <br> - * <b>Properties : </b> <br> - * <b>column : </b> Column name for aggregation. - * <b>alias : </b> Output value alias name. - * @displayName Function Index - * @category Stream Manipulators - * @tags sql aggregate - * @since 0.3.4 - */ -public abstract class FunctionIndex -{ - /** - * Column name. - */ - @NotNull - protected String column; - - /** - * Alias name. - */ - protected String alias; - - /** - * @param column Column name for aggregation. - * @param alias Output value alias name. - */ - public FunctionIndex(@NotNull String column, String alias) - { - this.column = column; - this.alias = alias; - } - - /** - * Aggregate compute function, implementation in sub class. - * @param rows Tuple list over application window. - * @return aggregate result object. - */ - public abstract Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; - - /** - * Get aggregate output value name. - * @return name string. - */ - protected abstract String aggregateName(); - - /** - * Apply compute function to given rows and store result in collect by output value name. - * @param rows Tuple list over application window. - */ - public void filter(ArrayList<Map<String, Object>> rows, Map<String, Object> collect) throws Exception - { - if (rows == null) { - return; - } - String name = column; - if (alias != null) { - name = alias; - } - if (name == null) { - name = aggregateName(); - } - collect.put(name, compute(rows)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java deleted file mode 100644 index f02e82c..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.function; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import org.apache.commons.lang.StringUtils; - -/** - * An implementation of function index that implements sql max and sql min function semantic. <br> - * <p> - * e.g : sql => SELECT MAX/MIN(column_name) FROM table_name. <br> - * <br> - * <b> Properties : </b> <br> - * <b> column : </b> column name for values max/min computation. <br> - * <b> alias : </b> Alias name for output value. <br> - * <b> isMax : </b> Flag to indicate max/min compute value. <br> - * @displayName Max Min Function - * @category Stream Manipulators - * @tags sql max, sql min - * @since 0.3.4 - */ -public class MaxMinFunction extends FunctionIndex -{ - /** - * Flag to indicate max/min compute value, compute max if true. - */ - private boolean isMax = true; - - /** - * @param column column name for values max/min computation. <br> - * @param alias Alias name for output. <br> - * @param isMax Flag to indicate max/min compute value. <br> - */ - public MaxMinFunction(@NotNull String column, String alias, boolean isMin) - { - super(column, alias); - isMax = !isMin; - } - - /** - * Compute max/min for given column. - * @return max/min value. - */ - @Override - public Object compute(ArrayList<Map<String, Object>> rows) throws Exception - { - double minMax = 0.0; - for (Map<String, Object> row : rows) { - double value = ((Number)row.get(column)).doubleValue(); - if ((isMax && (minMax < value)) || (!isMax && (minMax > value))) { - minMax = value; - } - } - return minMax; - } - - /** - * Aggregate output name. - * @return name string. - */ - @Override - protected String aggregateName() - { - if (!StringUtils.isEmpty(alias)) { - return alias; - } - if (isMax) { - return "MAX(" + column + ")"; - } - return "MIN(" + column + ")"; - } - - public boolean isMax() - { - return isMax; - } - - public void setMax(boolean isMax) - { - this.isMax = isMax; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java deleted file mode 100644 index 02186cd..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.function; - -import java.util.ArrayList; -import java.util.Map; - -import javax.validation.constraints.NotNull; - - - -/** - * <p> An implementation of sql sum function. </p> - * <p> - * @displayName Sum Function - * @category Stream Manipulators - * @tags sql sum, aggregate - * @since 0.3.4 - */ -public class SumFunction extends FunctionIndex -{ - public SumFunction(String column, String alias) throws Exception - { - super(column, alias); - } - - @Override - public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception - { - Double result = 0.0; - for (Map<String, Object> row : rows) { - if (!row.containsKey(column)) { - continue; - } - result += ((Number)row.get(column)).doubleValue(); - } - return result; - } - - @Override - protected String aggregateName() - { - return "Sum(" + column; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java deleted file mode 100644 index 21c1d11..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import javax.validation.constraints.NotNull; - - -/** - * Abstract class to filter row by binary expression index. - * <p> - * Sub class will implement filter/getExpressionName functions. - * @displayName Binary Expression - * @category Stream Manipulators - * @tags alias - * @since 0.3.4 - */ -public abstract class BinaryExpression implements Index -{ - /** - * Left column name argument for expression. - */ - @NotNull - protected String left; - - /** - * Right column name argument for expression. - */ - @NotNull - protected String right; - - /** - * Alias name for output field. - */ - protected String alias; - - /** - * @param left column name argument for expression. - * @param right column name argument for expression. - * @param alias name for output field. - */ - public BinaryExpression(@NotNull String left, @NotNull String right, String alias) - { - this.left = left; - this.right = right; - } - - public String getAlias() - { - return alias; - } - - public void setAlias(String alias) - { - this.alias = alias; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java deleted file mode 100644 index 931ddaa..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * <p>An implementation of Column Index that implements filter method based on mid index. </p> - * <p> - * @displayName Mid Index - * @category Stream Manipulators - * @tags index - * @since 0.3.4 - */ -public class MidIndex extends ColumnIndex -{ - private int start; - private int length = 0; - - public MidIndex(@NotNull String column, String alias, int start) - { - super(column, alias); - assert (start >= 0); - this.start = start; - } - - @Override - public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) - { - if (!row.containsKey(column)) { - return; - } - if (!(row.get(column) instanceof String)) { - assert (false); - } - String name = getColumn(); - if (alias != null) { - name = alias; - } - - int endIndex = start + length; - if ((length == 0) || (endIndex > ((String)row.get(column)).length())) { - collect.put(name, row.get(column)); - } else { - collect.put(name, ((String)row.get(column)).substring(start, endIndex)); - } - } - - public int getLength() - { - return length; - } - - public void setLength(int length) - { - assert (length > 0); - this.length = length; - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java deleted file mode 100644 index 969e3af..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import java.util.Map; - -import javax.validation.constraints.Null; - - -/** - * An implementation of Unary Expression that implements filter method using negate metric sql semantic on column value. - * <p> - * @displayName Negate Expression - * @category Stream Manipulators - * @tags expression, alias - * @since 0.3.4 - */ -public class NegateExpression extends UnaryExpression -{ - - /** - * @param column Name of column value to be negated. - */ - public NegateExpression(@Null String column, String alias) - { - super(column, alias); - if (this.alias == null) { - this.alias = "NEGATE(" + column + ")"; - } - } - - /* (non-Javadoc) - * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, java.util.Map) - */ - @Override - public void filter(Map<String, Object> row, Map<String, Object> collect) - { - if (!row.containsKey(column)) { - return; - } - collect.put(alias, -((Number)row.get(column)).doubleValue()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java deleted file mode 100644 index 90e16a1..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * <p>An implementation of column index that implements filter method using Round Double Index. </p> - * - * @displayName Round Double Index - * @category Stream Manipulators - * @tags alias, maths - * @since 0.3.4 - */ -public class RoundDoubleIndex extends ColumnIndex -{ - private int rounder; - public RoundDoubleIndex(@NotNull String column, String alias, int numDecimals) - { - super(column, alias); - rounder = 1; - if (numDecimals > 0) { - rounder = (int)Math.pow(10, numDecimals); - } - } - - @Override - public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) - { - if (!row.containsKey(column)) { - return; - } - double value = (Double)row.get(column); - value = Math.round(value * rounder) / rounder; - String name = getColumn(); - if (alias != null) { - name = alias; - } - collect.put(name, value); - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java deleted file mode 100644 index 2c49a79..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * <p>An implementation of Column Index that implements filter method using case of a string index. </p> - * - * @displayName String Case Index - * @category Stream Manipulators - * @tags alias - * @since 0.3.4 - */ -public class StringCaseIndex extends ColumnIndex -{ - private boolean toUpperCase = true; - public StringCaseIndex(@NotNull String column, String alias, boolean toLowerCase) - { - super(column, alias); - toUpperCase = !toLowerCase; - } - - @Override - public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) - { - if (!row.containsKey(column)) { - return; - } - if (!(row.get(column) instanceof String)) { - assert (false); - } - - String name = getColumn(); - if (alias != null) { - name = alias; - } - if (toUpperCase) { - collect.put(name, ((String)row.get(column)).toUpperCase()); - } else { - collect.put(name, ((String)row.get(column)).toLowerCase()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java deleted file mode 100644 index 4dbfee1..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - -/** - * <p>An implementation of Column Index that implements filter method using length of a string Index. </p> - * <p> - * @displayName String Length Index - * @category Stream Manipulators - * @tags alias - * @since 0.3.4 - */ -public class StringLenIndex extends ColumnIndex -{ - public StringLenIndex(@NotNull String column, String alias) - { - super(column, alias); - } - - @Override - public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) - { - if (!row.containsKey(column)) { - return; - } - if (!(row.get(column) instanceof String)) { - assert (false); - } - - String name = getColumn(); - if (alias != null) { - name = alias; - } - collect.put(name, ((String)row.get(column)).length()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java deleted file mode 100644 index a0144da..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery.index; - -import java.util.Map; - -import javax.validation.constraints.NotNull; - - -/** - * Implements sum on column index. - * <p> - * Select index class for implementing sum column index. - * @displayName Sum Expression - * @category Stream Manipulators - * @tags sum - * @since 0.3.4 - */ -public class SumExpression extends BinaryExpression -{ - - /** - * @param left column name argument for expression. - * @param right column name argument for expression. - * @param alias name for output field. - */ - public SumExpression(@NotNull String left, @NotNull String right, String alias) - { - super(left, right, alias); - if (this.alias == null) { - this.alias = "SUM(" + left + "," + right + ")"; - } - } - - /* sum column values. - * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, java.util.Map) - */ - @Override - public void filter(Map<String, Object> row, Map<String, Object> collect) - { - if (!row.containsKey(left) || !row.containsKey(right)) { - return; - } - collect.put(alias, ((Number)row.get(left)).doubleValue() + ((Number)row.get(right)).doubleValue()); - } - -}
