Repository: apex-malhar Updated Branches: refs/heads/master e29b7c6de -> 02a441d57
Change unique counter to not emit empty maps. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/02a441d5 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/02a441d5 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/02a441d5 Branch: refs/heads/master Commit: 02a441d57ff8a508e3f80191219a5782abbc193b Parents: e29b7c6 Author: Thomas Weise <[email protected]> Authored: Sun Mar 19 22:04:14 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Tue Mar 21 18:35:40 2017 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/lib/algo/UniqueCounter.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02a441d5/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java index 90b90a2..d029da6 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java @@ -28,14 +28,19 @@ import com.datatorrent.lib.util.BaseUniqueKeyCounter; import com.datatorrent.lib.util.UnifierHashMapSumKeys; /** - * This operator counts the number of times a tuple exists in a window. A map from tuples to counts is emitted at the end of each window. + * This operator counts the number of times a tuple is received and emits + * modified counts (if any) at the end of the streaming window. * <p> - * Counts the number of times a key exists or is added in that given window; Count is emitted for the modified or added keys at end of window in a single HashMap. + * Counts for modified keys are emitted at end of window in a single HashMap. If + * no keys were received in a window, then nothing will be emitted. By default + * the state is cleared at the end of the window. Cumulative counting can be + * configured through the {@link UniqueCounter#setCumulative} property. * </p> * <p> * This is an end of window operator<br> * <br> - * <b>StateFull : yes, </b> Tuples are aggregated over application window(s). <br> + * <b>StateFull : yes, </b> Tuples are aggregated over application window(s). + * <br> * <b>Partitions : Yes, </b> Unique count is unified at output port. <br> * <br> * <b>Ports</b>:<br> @@ -98,7 +103,9 @@ public class UniqueCounter<K> extends BaseUniqueKeyCounter<K> for (K key: inputSet) { tuple.put(key, map.get(key).toInteger()); } - count.emit(tuple); + if (!tuple.isEmpty()) { + count.emit(tuple); + } if (!cumulative) { map.clear(); }
