Repository: storm Updated Branches: refs/heads/master 62dec63b5 -> 50d55a951
[STORM-2731] - Simple checks in Storm Windowing Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91ac5c80 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91ac5c80 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91ac5c80 Branch: refs/heads/master Commit: 91ac5c801a0f000a4ae2313437d4de79ea5bd2bc Parents: 32389d7 Author: Jerry Peng <[email protected]> Authored: Thu Sep 7 15:32:39 2017 -0700 Committer: Jerry Peng <[email protected]> Committed: Tue Sep 19 13:58:40 2017 -0700 ---------------------------------------------------------------------- .../storm/topology/base/BaseWindowedBolt.java | 32 ++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/91ac5c80/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java index c445e9d..918760a 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java +++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java @@ -89,7 +89,14 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { public final int value; public Duration(int value, TimeUnit timeUnit) { - this.value = (int) timeUnit.toMillis(value); + if (value < 0) { + throw new IllegalArgumentException("Duration cannot be negative"); + } + long longVal = timeUnit.toMillis(value); + if (longVal > (long) Integer.MAX_VALUE) { + throw new IllegalArgumentException("Duration is too long"); + } + this.value = (int)longVal; } /** @@ -165,12 +172,14 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { '}'; } } - protected BaseWindowedBolt() { windowConfiguration = new HashMap<>(); } private BaseWindowedBolt withWindowLength(Count count) { + if (count == null) { + throw new IllegalArgumentException("Window length count cannot be set null"); + } if (count.value <= 0) { throw new IllegalArgumentException("Window length must be positive [" + count + "]"); } @@ -179,6 +188,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { } private BaseWindowedBolt withWindowLength(Duration duration) { + if (duration == null) { + throw new IllegalArgumentException("Window length duration cannot be set null"); + } if (duration.value <= 0) { throw new IllegalArgumentException("Window length must be positive [" + duration + "]"); } @@ -188,6 +200,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { } private BaseWindowedBolt withSlidingInterval(Count count) { + if (count == null) { + throw new IllegalArgumentException("Sliding interval count cannot be set null"); + } if (count.value <= 0) { throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]"); } @@ -196,6 +211,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { } private BaseWindowedBolt withSlidingInterval(Duration duration) { + if (duration == null) { + throw new IllegalArgumentException("Sliding interval duration cannot be set null"); + } if (duration.value <= 0) { throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]"); } @@ -282,6 +300,7 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { /** * Specify a field in the tuple that represents the timestamp as a long value. If this * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. + * The field MUST contain a timestamp in milliseconds * * @param fieldName the name of the field that contains the timestamp */ @@ -295,6 +314,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { * @param timestampExtractor the {@link TimestampExtractor} implementation */ public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) { + if (timestampExtractor == null) { + throw new IllegalArgumentException("Timestamp extractor cannot be set to null"); + } if (this.timestampExtractor != null) { throw new IllegalArgumentException("Window is already configured with a timestamp extractor: " + timestampExtractor); } @@ -316,6 +338,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { * @param streamId the name of the stream used to emit late tuples on */ public BaseWindowedBolt withLateTupleStream(String streamId) { + if (streamId == null) { + throw new IllegalArgumentException("Cannot set late tuple stream id to null"); + } windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId); return this; } @@ -339,6 +364,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt { * @param interval the interval at which watermark events are generated */ public BaseWindowedBolt withWatermarkInterval(Duration interval) { + if (interval == null) { + throw new IllegalArgumentException("Watermark interval cannot be set null"); + } windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value); return this; }
