http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java deleted file mode 100644 index b0d2e77..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; - -import javax.validation.constraints.Min; - -import org.apache.commons.lang.mutable.MutableDouble; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Operator compares consecutive values arriving at input port mapped by keys, emits <key,percent change> pair on output alert port if percent change exceeds percentage threshold set in operator. - * <p> - * StateFull : Yes, current key/value is stored in operator for comparison in - * next successive windows. <br> - * Partition(s): No, base comparison value will be inconsistent across - * instantiated copies. <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects KeyValPair<K,V extends Number><br> - * <b>alert</b>: emits KeyValPair<K,KeyValPair<V,Double>>(1)<br> - * <br> - * <b>Properties</b>:<br> - * <b>threshold</b>: The threshold of change between consecutive tuples of the - * same key that triggers an alert tuple<br> - * <b>inverse</b>: if set to true the key in the filter will block tuple<br> - * <b>filterBy</b>: List of keys to filter on<br> - * @displayName Change Alert Key Value - * @category Rules and Alerts - * @tags change, key value, numeric, percentage - * @since 0.3.3 - */ -public class ChangeAlertKeyVal<K, V extends Number> extends - BaseNumberKeyValueOperator<K, V> -{ - /** - * Base map is a StateFull field. It is retained across windows - */ - private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>(); - - /** - * Input data port that takes a key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Process each key, compute change or percent, and emit it. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - K key = tuple.getKey(); - double tval = tuple.getValue().doubleValue(); - MutableDouble val = basemap.get(key); - if (!doprocessKey(key)) { - return; - } - if (val == null) { // Only process keys that are in the basemap - val = new MutableDouble(tval); - basemap.put(cloneKey(key), val); - return; - } - double change = tval - val.doubleValue(); - double percent = (change / val.doubleValue()) * 100; - if (percent < 0.0) { - percent = 0.0 - percent; - } - if (percent > percentThreshold) { - KeyValPair<V, Double> dmap = new KeyValPair<V, Double>( - cloneValue(tuple.getValue()), percent); - KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>( - cloneKey(key), dmap); - alert.emit(otuple); - } - val.setValue(tval); - } - }; - - /** - * Key,Percent Change output port. - */ - public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>(); - - /** - * Alert thresh hold percentage set by application. - */ - @Min(1) - private double percentThreshold = 0.0; - - /** - * getter function for threshold value - * - * @return threshold value - */ - @Min(1) - public double getPercentThreshold() - { - return percentThreshold; - } - - /** - * setter function for threshold value - */ - public void setPercentThreshold(double d) - { - percentThreshold = d; - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java deleted file mode 100644 index e212a2d..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import javax.validation.constraints.Min; - -import org.apache.commons.lang.mutable.MutableDouble; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; - -/** - * Operator stores <key,value> pair in hash map across the windows for comparison and emits hash map of <key,percent change in value for each key> if percent change - * exceeds preset threshold. - * <p> - * - * StateFull : Yes, key/value pair in current window are stored for comparison in next window. <br> - * Partition : No, will yield wrong result, base value won't be consistent across instances. <br> - * - * <b>Ports</b>:<br> - * <b>data</b>: expects Map<K,V extends Number><br> - * <b>alert</b>: emits HashMap<K,HashMap<V,Double>>(1)<br> - * <br> - * <b>Properties</b>:<br> - * <b>threshold</b>: The threshold of change between consecutive tuples of the same key that triggers an alert tuple<br> - * <b>inverse</b>: if set to true the key in the filter will block tuple<br> - * <b>filterBy</b>: List of keys to filter on<br> - * @displayName Change Alert Map - * @category Rules and Alerts - * @tags change, key value, numeric, percentage, map - * @since 0.3.2 - */ -public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> -{ - /** - * Input data port that takes a map of <key,value>. - */ - public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() - { - /** - * Process each key, compute change or percent, and emits it. - */ - @Override - public void process(Map<K, V> tuple) - { - for (Map.Entry<K, V> e: tuple.entrySet()) { - MutableDouble val = basemap.get(e.getKey()); - if (!doprocessKey(e.getKey())) { - continue; - } - if (val == null) { // Only process keys that are in the basemap - val = new MutableDouble(e.getValue().doubleValue()); - basemap.put(cloneKey(e.getKey()), val); - continue; - } - double change = e.getValue().doubleValue() - val.doubleValue(); - double percent = (change / val.doubleValue()) * 100; - if (percent < 0.0) { - percent = 0.0 - percent; - } - if (percent > percentThreshold) { - HashMap<V,Double> dmap = new HashMap<V,Double>(1); - dmap.put(cloneValue(e.getValue()), percent); - HashMap<K,HashMap<V,Double>> otuple = new HashMap<K,HashMap<V,Double>>(1); - otuple.put(cloneKey(e.getKey()), dmap); - alert.emit(otuple); - } - val.setValue(e.getValue().doubleValue()); - } - } - }; - - // Default "pass through" unifier works as tuple is emitted as pass through - /** - * Output port which emits a hashmap of key, percentage change. - */ - public final transient DefaultOutputPort<HashMap<K, HashMap<V,Double>>> alert = new DefaultOutputPort<HashMap<K, HashMap<V,Double>>>(); - - /** - * basemap is a statefull field. It is retained across windows - */ - private HashMap<K,MutableDouble> basemap = new HashMap<K,MutableDouble>(); - @Min(1) - private double percentThreshold = 0.0; - - /** - * getter function for threshold value - * @return threshold value - */ - @Min(1) - public double getPercentThreshold() - { - return percentThreshold; - } - - /** - * setter function for threshold value - */ - public void setPercentThreshold(double d) - { - percentThreshold = d; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java deleted file mode 100644 index 3f77052..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; - -import org.apache.commons.lang.mutable.MutableDouble; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Operator compares <key,value> pairs arriving at data and base input ports and stores <key,value> pairs arriving at base port in hash map across the windows. - * <p/> - * The <key,value> pairs that arrive at data port are compared with base value if the key exists in the hash map. - * Change value and percentage are emitted on separate ports. - * StateFull : Yes, base map values are stored across windows. <br> - * Partitions : Yes, values on the base port are replicated across all partitions. However the order of tuples on the - * output stream may change. - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects KeyValPair<K,V extends Number><br> - * <b>base</b>: expects KeyValPair<K,V extends Number><br> - * <b>change</b>: emits KeyValPair<K,V>(1)<br> - * <b>percent</b>: emits KeyValPair<K,Double>(1)<br> - * <br> - * <br> - * <b>Properties</b>:<br> - * <b>inverse</b>: if set to true the key in the filter will block tuple<br> - * <b>filterBy</b>: List of keys to filter on<br> - * - * @displayName Change Key Value - * @category Math - * @tags change, key value - * @since 0.3.3 - */ -public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> -{ - /** - * basemap is a stateful field. It is retained across windows - */ - private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>(); - - /** - * Input data port that takes key value pairs. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Process each key, compute change or percent, and emit it. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - K key = tuple.getKey(); - if (!doprocessKey(key)) { - return; - } - MutableDouble bval = basemap.get(key); - if (bval != null) { // Only process keys that are in the basemap - double cval = tuple.getValue().doubleValue() - bval.doubleValue(); - change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval))); - percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100)); - } - } - }; - - /** - * Base value input port, stored in base map for comparison. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> base = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Process each key to store the value. If same key appears again update - * with latest value. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - if (tuple.getValue().doubleValue() != 0.0) { // Avoid divide by zero, Emit - // an error tuple? - MutableDouble val = basemap.get(tuple.getKey()); - if (val == null) { - val = new MutableDouble(0.0); - basemap.put(cloneKey(tuple.getKey()), val); - } - val.setValue(tuple.getValue().doubleValue()); - } - } - }; - - /** - * Key, Change output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, V>> change = new DefaultOutputPort<KeyValPair<K, V>>(); - - /** - * Key, Percentage Change pair output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Double>> percent = new DefaultOutputPort<KeyValPair<K, Double>>(); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java deleted file mode 100644 index 66bd7da..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.algo.MatchMap; -import com.datatorrent.lib.util.UnifierHashMap; - -/** - * Operator compares based on the property "key", "value", and "compare". - * <p> - * The comparison is done by getting double value from the Number. - * Passed tuples are emitted on the output port "compare". - * Failed tuples are emitted on port "except". - * Both output ports are optional, but at least one has to be connected. - * This module is a pass through<br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects Map<K,V><br> - * <b>compare</b>: emits HashMap<K,V><br> - * <b>except</b>: emits HashMap<K,V><br> - * <br> - * <b>Properties</b>:<br> - * <b>key</b>: The key on which compare is done<br> - * <b>value</b>: The value to compare with<br> - * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br> - * <br> - * Compile time checks<br> - * Key must be non empty<br> - * Value must be able to convert to a "double"<br> - * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br> - * <b>Specific run time checks</b>:<br> - * Does the incoming HashMap have the key<br> - * Is the value of the key a number<br> - * <p> - * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br> - * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareExceptMap<K,V extends Number> operator template"> - * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr> - * <tr><td><b>5 Million K,V pairs/s</b></td><td>Each tuple is emitted if emitError is set to true</td><td>In-bound rate determines performance as every tuple is emitted. - * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr> - * </table><br> - * <p> - * <b>Function Table (K=String, V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>: - * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareExceptMap<K,V extends Number> operator template"> - * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th colspan=2>Out-bound (emit)</th></tr> - * <tr><th><i>data</i>(HashMap<K,V>)</th><th><i>compare</i>(HashMap<K,V>)</th><th><i>except</i>(HashMap<K,V>)</th></tr> - * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr> - * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td><td>{a=2,b=20,c=1000}</td></tr> - * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td><td></td></tr> - * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td><td>{a=10,b=5}</td></tr> - * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td><td>{d=55,b=12}</td></tr> - * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td><td>{d=22,a=4}</td></tr> - * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td><td></td></tr> - * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr> - * </table> - * <br> - * <br> - * @displayName Compare Except Map - * @category Math - * @tags comparison, key value, number, hash map - * @since 0.3.2 - */ -@Stateless -public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V> -{ - /** - * Output port that emits a hashmap of matched tuples after comparison. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, V>> compare = match; - - /** - * Output port that emits a hashmap of non matching tuples after comparison. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>() - { - @Override - public Unifier<HashMap<K, V>> getUnifier() - { - return new UnifierHashMap<K, V>(); - } - }; - - /** - * Emits if compare port is connected - * @param tuple - */ - @Override - public void tupleMatched(Map<K, V> tuple) - { - if (compare.isConnected()) { - compare.emit(cloneTuple(tuple)); - } - } - - /** - * Emits if except port is connected - * @param tuple - */ - @Override - public void tupleNotMatched(Map<K, V> tuple) - { - if (except.isConnected()) { - except.emit(cloneTuple(tuple)); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CompareMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java deleted file mode 100644 index 3636207..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.algo.MatchMap; - -/** - * This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted. - * <p> - * If the tuple passed the test, it is emitted on the output port "compare". The comparison is done by getting double value from the Number. - * Both output ports are optional, but at least one has to be connected. - * This module is a pass through<br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects Map<K,V extends Number><br> - * <b>compare</b>: emits HashMap<K,V><br> - * <br> - * <b>Properties</b>:<br> - * <b>key</b>: The key on which compare is done<br> - * <b>value</b>: The value to compare with<br> - * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br> - * <br> - * <b>Compile time checks</b>:<br> - * Key must be non empty<br> - * Value must be able to convert to a "double"<br> - * CompareMap string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br> - * <br> - * <b>Specific run time checks</b>:<br> - * Does the incoming HashMap have the key<br> - * Is the value of the key a number<br> - * <p> - * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br> - * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareMap<K,V extends Number> operator template"> - * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr> - * <tr><td><b>8 Million K,V pairs/s</b></td><td>Each matched tuple is emitted</td><td>In-bound rate and number of tuples that match determine performance. - * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr> - * </table><br> - * <p> - * <b>Function Table (K=String,V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>: - * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareMap<K,V extends Number> operator template"> - * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr> - * <tr><th><i>data</i>(Map<K,V>)</th><th><i>compare</i>(HashMap<K,V>)</th></tr> - * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr> - * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td></tr> - * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td></tr> - * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td></tr> - * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td></tr> - * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td></tr> - * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td></tr> - * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td></tr> - * </table> - * <br> - * <br> - * @displayName Compare Map - * @category Math - * @tags comparison, key value, numeric, map - * @since 0.3.2 - */ -@Stateless -public class CompareMap<K, V extends Number> extends MatchMap<K,V> -{ - /** - * Output port that emits a hashmap of matching number tuples after comparison. - */ - public final transient DefaultOutputPort<HashMap<K, V>> compare = match; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java deleted file mode 100644 index d593020..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang.mutable.MutableInt; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.util.BaseKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.lib.util.UnifierCountOccurKey; - -/** - * This Operator aggregates occurrence of keys in <key,value> pair at input port.<Key,Occurrence count> pair is emitted for each input on output port. - * <p> - * <br> - * StateFull : Yes, key occurrence is aggregated over windows. <br> - * Partitions : Yes, count occurrence unifier at output port. <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects KeyValPair<K,V><br> - * <b>count</b>: emits KeyValPair<K,Integer></b><br> - * <br> - * @displayName Count Key Value - * @category Math - * @tags count, key value, aggregate - * @since 0.3.3 - */ -public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V> -{ - - /** - * Key occurrence count map. - */ - protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>(); - - /** - * Input data port that takes key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * For each tuple (a key value pair): Adds the values for each key, Counts - * the number of occurrence of each key - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - K key = tuple.getKey(); - MutableInt count = counts.get(key); - if (count == null) { - count = new MutableInt(0); - counts.put(cloneKey(key), count); - } - count.increment(); - } - - @Override - public StreamCodec<KeyValPair<K, V>> getStreamCodec() - { - return getKeyValPairStreamCodec(); - } - }; - - /** - * Key, occurrence value pair output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>() - { - @Override - public UnifierCountOccurKey<K> getUnifier() - { - return new UnifierCountOccurKey<K>(); - } - }; - - /** - * Emits on all ports that are connected. Data is computed during process on - * input port and endWindow just emits it for each key. Clears the internal - * data if resetAtEndWindow is true. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public void endWindow() - { - for (Map.Entry<K, MutableInt> e : counts.entrySet()) { - count.emit(new KeyValPair(e.getKey(), - new Integer(e.getValue().intValue()))); - } - counts.clear(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java deleted file mode 100644 index ddef880..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.algo.MatchMap; -import com.datatorrent.lib.util.UnifierHashMap; - -/** - * This operator does comparison on tuple sub-classed from Number based on the property "key", "value", and "cmp", and not matched tuples are emitted. - * <p> - * The comparison is done by getting double value from the Number. Both output ports - * are optional, but at least one has to be connected - * <p> - * This module is a pass through<br> - * <br> - * <br> - * StateFull : No, output is emitted in current window. <br> - * Partitions : Yes, No state dependency among input tuples. <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects Map<K,V extends Number><br> - * <b>except</b>: emits HashMap<K,V><br> - * <br> - * <b>Properties</b>:<br> - * <b>key</b>: The key on which compare is done<br> - * <b>value</b>: The value to compare with<br> - * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", - * "neq", "gt", "gte". Default is "eq"<br> - * <br> - * <b>Compile time checks</b>:<br> - * Key must be non empty<br> - * Value must be able to convert to a "double"<br> - * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", - * "gte"<br> - * <br> - * <b>Run time checks</b>:<br> - * Does the incoming HashMap have the key, Is the value of the key a number<br> - * <br> - * @displayName Except Map - * @category Math - * @tags comparison, Number - * @since 0.3.3 - */ -@Stateless -public class ExceptMap<K, V extends Number> extends MatchMap<K, V> -{ - /** - * Output port that emits non matching number tuples. - */ - public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>() - { - @Override - public Unifier<HashMap<K, V>> getUnifier() - { - return new UnifierHashMap<K, V>(); - } - }; - - /** - * Does nothing. Overrides base as call super.tupleMatched() would emit the - * tuple - * - * @param tuple - */ - @Override - public void tupleMatched(Map<K, V> tuple) - { - } - - /** - * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override - * in case objects are mutable - * - * @param tuple - */ - @Override - public void tupleNotMatched(Map<K, V> tuple) - { - except.emit(cloneTuple(tuple)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/Quotient.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java b/library/src/main/java/com/datatorrent/lib/math/Quotient.java deleted file mode 100644 index ed08e86..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.lib.util.BaseNumberValueOperator; - -/** - * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window. - * <p> - * <br> - * <b>StateFull : Yes </b>, Sum of values is taken over application window. <br> - * <b>Partitions : No </b>, will yield wrong results, since values are - * accumulated over application window. <br> - * <br> - * <b>Ports</b>:<br> - * <b>numerator</b>: expects V extends Number<br> - * <b>denominator</b>: expects V extends Number<br> - * <b>quotient</b>: emits Double<br> - * <br> - * <b>Properties : </b> <br> - * <b>mult_by : </b>Multiply by value(default = 1). <br> - * <br> - * @displayName Quotient - * @category Math - * @tags division, sum, numeric - * @since 0.3.3 - */ -@OperatorAnnotation(partitionable = false) -public class Quotient<V extends Number> extends BaseNumberValueOperator<V> -{ - protected double nval = 0.0; - protected double dval = 0.0; - int mult_by = 1; - - /** - * Numerator values input port. - */ - public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>() - { - /** - * Adds to the numerator value - */ - @Override - public void process(V tuple) - { - nval += tuple.doubleValue(); - } - }; - - /** - * Denominator values input port. - */ - public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>() - { - /** - * Adds to the denominator value - */ - @Override - public void process(V tuple) - { - dval += tuple.doubleValue(); - } - }; - - /** - * Quotient output port. - */ - public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>(); - - public void setMult_by(int i) - { - mult_by = i; - } - - /** - * Generates tuple emits it as long as denominator is not 0. Clears internal - * data - */ - @Override - public void endWindow() - { - if (dval == 0) { - return; - } - double val = (nval / dval) * mult_by; - quotient.emit(getValue(val)); - nval = 0.0; - dval = 0.0; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java deleted file mode 100644 index a10fe95..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import javax.validation.constraints.Min; - -import org.apache.commons.lang.mutable.MutableDouble; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; - -/** - * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator. - * <p> - * <br> - * Application can set multiplication value for quotient(default = 1). <br> - * Operator will calculate quotient of occurrence of key in numerator divided by - * occurrence of key in denominator if countKey flag is true. <br> - * Application can allow or block keys by setting filter key and inverse flag. <br> - * <br> - * <b>StateFull : Yes</b>, numerator/denominator values are summed over - * application window. <br> - * <b>Partitions : No, </b>, will yield wrong results, since values are summed - * over app window. <br> - * <br> - * <b>Ports</b>:<br> - * <b>numerator</b>: expects Map<K,V extends Number><br> - * <b>denominator</b>: expects Map<K,V extends Number><br> - * <b>quotient</b>: emits HashMap<K,Double><br> - * <br> - * <b>Properties</b>:<br> - * <b>inverse :</b> if set to true the key in the filter will block tuple<br> - * <b>filterBy :</b> List of keys to filter on<br> - * <b>countkey :</b> Get quotient of occurrence of keys in numerator and - * denominator. <br> - * <b>mult_by :</b> Set multiply by constant value. <br> - * <br> - * @displayName Quotient Map - * @category Math - * @tags division, sum, map - * @since 0.3.3 - */ -@OperatorAnnotation(partitionable = false) -public class QuotientMap<K, V extends Number> extends - BaseNumberKeyValueOperator<K, V> -{ - /** - * Numerator key/sum value map. - */ - protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>(); - - /** - * Denominator key/sum value map. - */ - protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>(); - - /** - * Count occurrence of keys if set to true. - */ - boolean countkey = false; - - /** - * Quotient multiply by value. - */ - int mult_by = 1; - - /** - * Numerator input port. - */ - public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>() - { - /** - * Added tuple to the numerator hash - */ - @Override - public void process(Map<K, V> tuple) - { - addTuple(tuple, numerators); - } - }; - - /** - * Denominator input port. - */ - public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>() - { - /** - * Added tuple to the denominator hash - */ - @Override - public void process(Map<K, V> tuple) - { - addTuple(tuple, denominators); - } - }; - - /** - * Quotient output port. - */ - public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>(); - - /** - * Add tuple to nval/dval map. - * - * @param tuple - * key/value map on input port. - * @param map - * key/summed value map. - */ - public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map) - { - for (Map.Entry<K, V> e : tuple.entrySet()) { - addEntry(e.getKey(), e.getValue(), map); - } - } - - /** - * Add/Update entry to key/sum value map. - * - * @param key - * name. - * @param value - * value for key. - * @param map - * numerator/denominator key/sum map. - */ - public void addEntry(K key, V value, Map<K, MutableDouble> map) - { - if (!doprocessKey(key) || (value == null)) { - return; - } - MutableDouble val = map.get(key); - if (val == null) { - if (countkey) { - val = new MutableDouble(1.00); - } else { - val = new MutableDouble(value.doubleValue()); - } - } else { - if (countkey) { - val.increment(); - } else { - val.add(value.doubleValue()); - } - } - map.put(cloneKey(key), val); - } - - /** - * getter for mult_by - * - * @return mult_by - */ - - @Min(0) - public int getMult_by() - { - return mult_by; - } - - /** - * getter for countkey - * - * @return countkey - */ - public boolean getCountkey() - { - return countkey; - } - - /** - * Setter for mult_by - * - * @param i - */ - public void setMult_by(int i) - { - mult_by = i; - } - - /** - * setter for countkey - * - * @param i - * sets countkey - */ - public void setCountkey(boolean i) - { - countkey = i; - } - - /** - * Generates tuples for each key and emits them. Only keys that are in the - * denominator are iterated on If the key is only in the numerator, it gets - * ignored (cannot do divide by 0) Clears internal data - */ - @Override - public void endWindow() - { - HashMap<K, Double> tuples = new HashMap<K, Double>(); - for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) { - MutableDouble nval = numerators.get(e.getKey()); - if (nval == null) { - tuples.put(e.getKey(), new Double(0.0)); - } else { - tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue() - .doubleValue()) * mult_by)); - } - } - if (!tuples.isEmpty()) { - quotient.emit(tuples); - } - numerators.clear(); - denominators.clear(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java deleted file mode 100644 index c2d8465..0000000 --- a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.math; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang.mutable.MutableDouble; -import org.apache.commons.lang.mutable.MutableInt; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.UnifierHashMapInteger; -import com.datatorrent.lib.util.UnifierHashMapSumKeys; - -/** - * Emits the sum and count of values for each key at the end of window. - * <p> - * Application accumulate sum across streaming window by setting cumulative flag - * to true. <br> - * This is an end of window operator<br> - * <br> - * <b>StateFull : Yes</b>, Sum is computed over application window and streaming - * window. <br> - * <b>Partitions : Yes</b>, Sum is unified at output port. <br> - * <br> - * <b>Ports</b>:<br> - * <b>data</b>: expects Map<K,V extends Number><br> - * <b>sum</b>: emits HashMap<K,V><br> - * <b>count</b>: emits HashMap<K,Integer></b><br> - * <br> - * <b>Properties</b>:<br> - * <b>inverse</b>: if set to true the key in the filter will block tuple<br> - * <b>filterBy</b>: List of keys to filter on<br> - * <b>cumulative</b>: boolean flag, if set the sum is not cleared at the end of - * window, <br> - * hence generating cumulative sum across streaming windows. Default is false.<br> - * <br> - * @displayName Sum Count Map - * @category Math - * @tags number, sum, counting, map - * @since 0.3.3 - */ -public class SumCountMap<K, V extends Number> extends - BaseNumberKeyValueOperator<K, V> -{ - /** - * Key/double sum map. - */ - protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>(); - - /** - * Key/integer sum map. - */ - protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>(); - - /** - * Cumulative sum flag. - */ - protected boolean cumulative = false; - - /** - * Input port that takes a map. It adds the values for each key and counts the number of occurrences for each key. - */ - public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() - { - /** - * For each tuple (a HashMap of keys,val pairs) Adds the values for each - * key, Counts the number of occurrences of each key - */ - @Override - public void process(Map<K, V> tuple) - { - for (Map.Entry<K, V> e : tuple.entrySet()) { - K key = e.getKey(); - if (!doprocessKey(key)) { - continue; - } - if (sum.isConnected()) { - MutableDouble val = sums.get(key); - if (val == null) { - val = new MutableDouble(e.getValue().doubleValue()); - } else { - val.add(e.getValue().doubleValue()); - } - sums.put(cloneKey(key), val); - } - if (SumCountMap.this.count.isConnected()) { - MutableInt count = counts.get(key); - if (count == null) { - count = new MutableInt(0); - counts.put(cloneKey(key), count); - } - count.increment(); - } - } - } - }; - - /** - * Key,sum map output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>() - { - @Override - public Unifier<HashMap<K, V>> getUnifier() - { - return new UnifierHashMapSumKeys<K, V>(); - } - }; - - /** - * Key,double sum map output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>() - { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Unifier<HashMap<K, Double>> getUnifier() - { - UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>(); - ret.setType(Double.class); - return ret; - } - }; - - /** - * Key,integer sum output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>() - { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Unifier<HashMap<K, Integer>> getUnifier() - { - UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>(); - ret.setType(Integer.class); - return ret; - } - }; - - - /** - * Key,long sum output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>() - { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Unifier<HashMap<K, Long>> getUnifier() - { - UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>(); - ret.setType(Long.class); - return ret; - } - }; - - /** - * Key,short sum output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>() - { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Unifier<HashMap<K, Short>> getUnifier() - { - UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>(); - ret.setType(Short.class); - return ret; - } - }; - - /** - * Key,float sum output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>() - { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Unifier<HashMap<K, Float>> getUnifier() - { - UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>(); - ret.setType(Float.class); - return ret; - } - }; - - /** - * Key,integer sum output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>() - { - @Override - public Unifier<HashMap<K, Integer>> getUnifier() - { - return new UnifierHashMapInteger<K>(); - } - }; - - /** - * Get cumulative flag. - * - * @return cumulative flag - */ - public boolean isCumulative() - { - return cumulative; - } - - /** - * set cumulative flag. - * - * @param cumulative - * input flag - */ - public void setCumulative(boolean cumulative) - { - this.cumulative = cumulative; - } - - /** - * Emits on all ports that are connected. Data is precomputed during process - * on input port endWindow just emits it for each key Clears the internal data - * before return - */ - @Override - public void endWindow() - { - - // Should allow users to send each key as a separate tuple to load balance - // This is an aggregate node, so load balancing would most likely not be - // needed - - HashMap<K, V> tuples = new HashMap<K, V>(); - HashMap<K, Integer> ctuples = new HashMap<K, Integer>(); - HashMap<K, Double> dtuples = new HashMap<K, Double>(); - HashMap<K, Integer> ituples = new HashMap<K, Integer>(); - HashMap<K, Float> ftuples = new HashMap<K, Float>(); - HashMap<K, Long> ltuples = new HashMap<K, Long>(); - HashMap<K, Short> stuples = new HashMap<K, Short>(); - - for (Map.Entry<K, MutableDouble> e : sums.entrySet()) { - K key = e.getKey(); - MutableDouble val = e.getValue(); - tuples.put(key, getValue(val.doubleValue())); - dtuples.put(key, val.doubleValue()); - ituples.put(key, val.intValue()); - ftuples.put(key, val.floatValue()); - ltuples.put(key, val.longValue()); - stuples.put(key, val.shortValue()); - // ctuples.put(key, counts.get(e.getKey()).toInteger()); - MutableInt c = counts.get(e.getKey()); - if (c != null) { - ctuples.put(key, c.toInteger()); - } - } - - sum.emit(tuples); - sumDouble.emit(dtuples); - sumInteger.emit(ituples); - sumLong.emit(ltuples); - sumShort.emit(stuples); - sumFloat.emit(ftuples); - count.emit(ctuples); - clearCache(); - } - - /** - * Clear sum maps. - */ - private void clearCache() - { - if (!cumulative) { - sums.clear(); - counts.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java deleted file mode 100644 index e3bba8a..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.HashMap; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; - -/** - * A base implementation of a BaseOperator that is a sql stream operator. Subclasses should provide the - implementation of how to process the tuples. - * <p> - * Abstract sql db input operator. - * <p> - * @displayName Abstract Sql Stream - * @category Stream Manipulators - * @tags sql operator - * @since 0.3.2 - */ -public abstract class AbstractSqlStreamOperator extends BaseOperator -{ - public static class InputSchema - { - public static class ColumnInfo - { - public String type; - public int bindIndex = 0; - public boolean isColumnIndex = false; - } - - /** - * the name of the input "table" - */ - public String name; - /** - * key is the name of the column, and value is the SQL type - */ - public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>(); - - public InputSchema() - { - } - - public InputSchema(String name) - { - this.name = name; - } - - public void setColumnInfo(String columnName, String columnType, boolean isColumnIndex) - { - ColumnInfo t = new ColumnInfo(); - t.type = columnType; - t.isColumnIndex = isColumnIndex; - columnInfoMap.put(columnName, t); - } - - } - - protected String statement; - protected ArrayList<InputSchema> inputSchemas = new ArrayList<InputSchema>(5); - protected transient ArrayList<Object> bindings; - - /** - * Input bindings port that takes an arraylist of objects. - */ - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<ArrayList<Object>> bindingsPort = new DefaultInputPort<ArrayList<Object>>() - { - @Override - public void process(ArrayList<Object> tuple) - { - bindings = tuple; - } - - }; - - /** - * Input port in1 that takes a hashmap of <string,object>. - */ - public final transient DefaultInputPort<HashMap<String, Object>> in1 = new DefaultInputPort<HashMap<String, Object>>() - { - @Override - public void process(HashMap<String, Object> tuple) - { - processTuple(0, tuple); - } - - }; - - /** - * Input port in2 that takes a hashmap of <string,object>. - */ - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<HashMap<String, Object>> in2 = new DefaultInputPort<HashMap<String, Object>>() - { - @Override - public void process(HashMap<String, Object> tuple) - { - processTuple(1, tuple); - } - - }; - - /** - * Input port in3 that takes a hashmap of <string,object>. - */ - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<HashMap<String, Object>> in3 = new DefaultInputPort<HashMap<String, Object>>() - { - @Override - public void process(HashMap<String, Object> tuple) - { - processTuple(2, tuple); - } - - }; - - /** - * Input port in4 that takes a hashmap of <string,object>. - */ - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<HashMap<String, Object>> in4 = new DefaultInputPort<HashMap<String, Object>>() - { - @Override - public void process(HashMap<String, Object> tuple) - { - processTuple(3, tuple); - } - - }; - - /** - * Input port in5 that takes a hashmap of <string,object>. - */ - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<HashMap<String, Object>> in5 = new DefaultInputPort<HashMap<String, Object>>() - { - @Override - public void process(HashMap<String, Object> tuple) - { - processTuple(4, tuple); - } - - }; - - /** - * Output result port that emits a hashmap of <string,object>. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<HashMap<String, Object>> result = new DefaultOutputPort<HashMap<String, Object>>(); - - public void setStatement(String statement) - { - this.statement = statement; - } - - public String getStatement() - { - return this.statement; - } - - public void setInputSchema(int inputPortIndex, InputSchema inputSchema) - { - inputSchemas.add(inputPortIndex, inputSchema); - } - - public abstract void processTuple(int tableNum, HashMap<String, Object> tuple); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java deleted file mode 100644 index 77c7522..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.Map; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.streamquery.condition.Condition; - -/** - * An implementation of BaseOperator that provides sql delete query semantic on live data stream. <br> - * <p> - * Stream rows passing condition are emitted on output port stream. <br> - * <br> - * <b>StateFull : NO,</b> all row data is processed in current time window. <br> - * <b>Partitions : Yes, </b> No Input dependency among input rows. <br> - * <br> - * <b>Ports</b>:<br> - * <b> inport : </b> Input hash map(row) port, expects - * HashMap<String,Object><<br> - * <b> outport : </b> Output hash map(row) port, emits - * HashMap<String,Object><br> - * <br> - * <b> Properties : <b> <br> - * <b> condition : </b> Select condition for selecting rows. <br> - * <b> columns : </b> Column names/aggregate functions for select. <br> - * <br> - * @displayName Delete - * @category Stream Manipulators - * @tags sql delete operator - * @since 0.3.3 - */ -public class DeleteOperator extends BaseOperator -{ - - /** - * condition. - */ - private Condition condition = null; - - /** - * set condition. - */ - public void setCondition(Condition condition) - { - this.condition = condition; - } - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - - @Override - public void process(Map<String, Object> tuple) - { - if ((condition != null) && (!condition.isValidRow(tuple))) { - outport.emit(tuple); - } - } - }; - - /** - * Output port emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java deleted file mode 100644 index 2fe8bc3..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo; - -/** - * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator. - * <p> - * @displayName Derby Sql Stream - * @category Stream Manipulators - * @tags sql, in-memory, input operator - * @since 0.3.2 - */ -public class DerbySqlStreamOperator extends AbstractSqlStreamOperator -{ - protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5); - protected List<String> execStmtStringList = new ArrayList<String>(); - protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5); - protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5); - protected transient Connection db; - - public void addExecStatementString(String stmt) - { - this.execStmtStringList.add(stmt); - } - - - @Override - public void setup(OperatorContext context) - { - System.setProperty("derby.stream.error.file", "/dev/null"); - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - - String connUrl = "jdbc:derby:memory:MALHAR_TEMP;create=true"; - PreparedStatement st; - - try { - db = DriverManager.getConnection(connUrl); - // create the temporary tables here - for (int i = 0; i < inputSchemas.size(); i++) { - InputSchema inputSchema = inputSchemas.get(i); - if (inputSchema == null || inputSchema.columnInfoMap.isEmpty()) { - continue; - } - String columnSpec = ""; - String columnNames = ""; - String insertQuestionMarks = ""; - int j = 0; - for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) { - if (!columnSpec.isEmpty()) { - columnSpec += ","; - columnNames += ","; - insertQuestionMarks += ","; - } - columnSpec += entry.getKey(); - columnSpec += " "; - columnSpec += entry.getValue().type; - columnNames += entry.getKey(); - insertQuestionMarks += "?"; - entry.getValue().bindIndex = ++j; - } - String createTempTableStmt = - "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED"; - st = db.prepareStatement(createTempTableStmt); - st.execute(); - st.close(); - - String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES (" - + insertQuestionMarks + ")"; - - insertStatements.add(i, db.prepareStatement(insertStmt)); - deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name)); - } - for (String stmtStr : execStmtStringList) { - execStatements.add(db.prepareStatement(stmtStr)); - } - } catch (SQLException ex) { - throw new RuntimeException(ex); - } - } - - @Override - public void beginWindow(long windowId) - { - try { - db.setAutoCommit(false); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } - } - - @Override - public void processTuple(int tableNum, HashMap<String, Object> tuple) - { - InputSchema inputSchema = inputSchemas.get(tableNum); - - PreparedStatement insertStatement = insertStatements.get(tableNum); - try { - for (Map.Entry<String, Object> entry : tuple.entrySet()) { - ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey()); - if (t != null && t.bindIndex != 0) { - insertStatement.setString(t.bindIndex, entry.getValue().toString()); - } - } - - insertStatement.executeUpdate(); - insertStatement.clearParameters(); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } - } - - @Override - public void endWindow() - { - try { - db.commit(); - if (bindings != null) { - for (int i = 0; i < bindings.size(); i++) { - for (PreparedStatement stmt : execStatements) { - stmt.setString(i, bindings.get(i).toString()); - } - } - } - - for (PreparedStatement stmt : execStatements) { - executePreparedStatement(stmt); - } - for (PreparedStatement st : deleteStatements) { - st.executeUpdate(); - st.clearParameters(); - } - } catch (SQLException ex) { - throw new RuntimeException(ex); - } - bindings = null; - } - - private void executePreparedStatement(PreparedStatement statement) throws SQLException - { - ResultSet res = statement.executeQuery(); - ResultSetMetaData resmeta = res.getMetaData(); - int columnCount = resmeta.getColumnCount(); - while (res.next()) { - HashMap<String, Object> resultRow = new HashMap<String, Object>(); - for (int i = 1; i <= columnCount; i++) { - resultRow.put(resmeta.getColumnName(i), res.getObject(i)); - } - this.result.emit(resultRow); - } - statement.clearParameters(); - } - - @Override - public void teardown() - { - try { - db.close(); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java deleted file mode 100644 index 1821953..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.condition.HavingCondition; -import com.datatorrent.lib.streamquery.function.FunctionIndex; -import com.datatorrent.lib.streamquery.index.ColumnIndex; - -/** - * An implementation of BaseOperator that provides sql group by querying semantics on live data stream. <br> - * <p> - * Stream rows satisfying given select condition are processed by group by - * column names and aggregate column function. <br> - * If having condition is specified for aggregate index(s), it must also be - * satisfied by row. HashMap of column name(s) and aggregate alias is emitted on - * output port. <br> - * <br> - * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> - * <b>Partitions : No, </b> will yield wrong result(s). <br> - * <br> - * <b>Ports</b>:<br> - * <b> inport : </b> Input hash map(row) port, expects - * HashMap<String,Object><<br> - * <b> outport : </b> Output hash map(row) port, emits - * HashMap<String,Object><br> - * <br> - * <b> Properties : <b> <br> - * <b> condition : </b> Select condition for deleting rows. <br> - * <b> columnGroupIndexes : </b> Group by names list. <br> - * <b> indexes : </b> Select column indexes. <br> - * <b> havingConditions : </b> Having filter conditions for aggregate(s). <br> - * <br> - * @displayName GroupBy Having Operator - * @category Stream Manipulators - * @tags sql, groupby operator, condition, index - * @since 0.3.4 - */ -@OperatorAnnotation(partitionable = false) -public class GroupByHavingOperator extends BaseOperator -{ - - /** - * aggregate indexes. - */ - private ArrayList<FunctionIndex> aggregates = new ArrayList<FunctionIndex>(); - - /** - * Column, Group by names - */ - private ArrayList<ColumnIndex> columnGroupIndexes = new ArrayList<ColumnIndex>(); - - /** - * where condition. - */ - private Condition condition; - - /** - * having aggregate condtion; - */ - private ArrayList<HavingCondition> havingConditions = new ArrayList<HavingCondition>(); - - /** - * Table rows. - */ - private ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>(); - - public void addAggregateIndex(@NotNull FunctionIndex index) - { - aggregates.add(index); - } - - public void addColumnGroupByIndex(@NotNull ColumnIndex index) - { - columnGroupIndexes.add(index); - } - - public void addHavingCondition(@NotNull HavingCondition condition) - { - havingConditions.add(condition); - } - - /** - * @param condition condition - */ - public void setCondition(Condition condition) - { - this.condition = condition; - } - - /** - * Input port that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() - { - - @Override - public void process(Map<String, Object> tuple) - { - if ((condition != null) && (!condition.isValidRow(tuple))) { - return; - } - rows.add(tuple); - } - }; - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>(); - - /** - * Create aggregate at end window. - */ - @Override - public void endWindow() - { - // group names - if (columnGroupIndexes.size() == 0) { - rows = new ArrayList<Map<String, Object>>(); - return; - } - - // group rows - HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>> groups = new HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>>(); - for (Map<String, Object> row : rows) { - MultiKeyCompare key = new MultiKeyCompare(); - for (ColumnIndex index : columnGroupIndexes) { - key.addCompareKey(row.get(index.getColumn())); - } - ArrayList<Map<String, Object>> subRows; - if (groups.containsKey(key)) { - subRows = groups.get(key); - } else { - subRows = new ArrayList<Map<String, Object>>(); - groups.put(key, subRows); - } - subRows.add(row); - } - - // Iterate over groups and emit aggregate values - for (Map.Entry<MultiKeyCompare, ArrayList<Map<String, Object>>> entry : groups - .entrySet()) { - ArrayList<Map<String, Object>> subRows = entry.getValue(); - - // get result - Map<String, Object> result = new HashMap<String, Object>(); - for (ColumnIndex index : columnGroupIndexes) { - index.filter(subRows.get(0), result); - } - - // append aggregate values - for (FunctionIndex aggregate : aggregates) { - try { - aggregate.filter(subRows, result); - } catch (Exception e) { - e.printStackTrace(); - } - } - - // check valid having aggregate - boolean isValidHaving = true; - for (HavingCondition condition : havingConditions) { - try { - isValidHaving &= condition.isValidAggregate(subRows); - } catch (Exception e) { - e.printStackTrace(); - return; - } - } - if (isValidHaving) { - outport.emit(result); - } - } - - rows = new ArrayList<Map<String, Object>>(); - } - - /** - * multi key compare class. - */ - @SuppressWarnings("rawtypes") - private class MultiKeyCompare implements Comparable - { - - /** - * compare keys. - */ - ArrayList<Object> compareKeys = new ArrayList<Object>(); - - @Override - public boolean equals(Object other) - { - if (other instanceof MultiKeyCompare) { - if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) { - return false; - } - } - for (int i = 0; i < compareKeys.size(); i++) { - if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) { - return false; - } - } - return true; - } - - @Override - public int hashCode() - { - int hashCode = 0; - for (int i = 0; i < compareKeys.size(); i++) { - hashCode += compareKeys.get(i).hashCode(); - } - return hashCode; - } - - @Override - public int compareTo(Object other) - { - if (this.equals(other)) { - return 0; - } - return -1; - } - - /** - * Add compare key. - */ - public void addCompareKey(Object value) - { - compareKeys.add(value); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java deleted file mode 100644 index 883329e..0000000 --- a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.streamquery; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.lib.streamquery.condition.Condition; -import com.datatorrent.lib.streamquery.index.Index; - -/** - * An implementation of Operator that reads table row data from two table data input ports. <br> - * <p> - * Operator joins row on given condition and selected names, emits - * joined result at output port. - * <br> - * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br> - * <b>Partitions : No, </b> will yield wrong result(s). <br> - * <br> - * <b>Ports : </b> <br> - * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br> - * <b> inport1 : </b> Input port for table 2, expects HashMap<String, Object> <br> - * <b> outport : </b> Output joined row port, emits HashMap<String, ArrayList<Object>> <br> - * <br> - * <b> Properties : </b> - * <b> joinCondition : </b> Join condition for table rows. <br> - * <b> table1Columns : </b> Columns to be selected from table1. <br> - * <b> table2Columns : </b> Columns to be selected from table2. <br> - * <br> - * @displayName Inner join - * @category Stream Manipulators - * @tags sql, inner join operator - * - * @since 0.3.3 - */ -@OperatorAnnotation(partitionable = false) -public class InnerJoinOperator implements Operator -{ - - /** - * Join Condition; - */ - protected Condition joinCondition; - - /** - * Table1 select columns. - */ - private ArrayList<Index> table1Columns = new ArrayList<Index>(); - - /** - * Table2 select columns. - */ - private ArrayList<Index> table2Columns = new ArrayList<Index>(); - - /** - * Collect data rows from input port 1. - */ - protected ArrayList<Map<String, Object>> table1; - - /** - * Collect data from input port 2. - */ - protected ArrayList<Map<String, Object>> table2; - - /** - * Input port 1 that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>() - { - @Override - public void process(Map<String, Object> tuple) - { - table1.add(tuple); - for (int j = 0; j < table2.size(); j++) { - if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) { - joinRows(tuple, table2.get(j)); - } - } - } - }; - - /** - * Input port 2 that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() - { - @Override - public void process(Map<String, Object> tuple) - { - table2.add(tuple); - for (int j = 0; j < table1.size(); j++) { - if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) { - joinRows(table1.get(j), tuple); - } - } - } - }; - - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = - new DefaultOutputPort<Map<String, Object>>(); - - @Override - public void setup(OperatorContext arg0) - { - table1 = new ArrayList<Map<String, Object>>(); - table2 = new ArrayList<Map<String, Object>>(); - } - - @Override - public void teardown() - { - } - - @Override - public void beginWindow(long arg0) - { - } - - @Override - public void endWindow() - { - table1.clear(); - table2.clear(); - } - - /** - * @return the joinCondition - */ - public Condition getJoinCondition() - { - return joinCondition; - } - - /** - * Pick the supported condition. Currently only equal join is supported. - * @param joinCondition joinCondition - */ - public void setJoinCondition(Condition joinCondition) - { - this.joinCondition = joinCondition; - } - - /** - * Select table1 column name. - */ - public void selectTable1Column(Index column) - { - table1Columns.add(column); - } - - /** - * Select table2 column name. - */ - public void selectTable2Column(Index column) - { - table2Columns.add(column); - } - - /** - * Join row from table1 and table2. - */ - protected void joinRows(Map<String, Object> row1, Map<String, Object> row2) - { - // joined row - Map<String, Object> join = new HashMap<String, Object>(); - - // filter table1 columns - if (row1 != null) { - for (Index index: table1Columns) { - index.filter(row1, join); - } - } - - // filter table1 columns - if (row2 != null) { - for (Index index: table2Columns) { - index.filter(row2, join); - } - } - - // emit row - outport.emit(join); - } - -}
