nwangtw closed pull request #3111: [Java Streamlet API] Extend Validations Part 
II
URL: https://github.com/apache/incubator-heron/pull/3111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index cdab255fcf..0ae795f8e3 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -33,7 +33,8 @@ java_library(
     javacopts = DOCLINT_HTML_AND_SYNTAX,
     deps = api_deps_files + [
         ":api-java-low-level",
-        "//third_party/java:kryo-neverlink"
+        "//third_party/java:kryo-neverlink",
+        "@org_apache_commons_commons_lang3//jar"
     ]
 )
 
@@ -42,13 +43,19 @@ java_library(
     name = "api-java-low-level-functional",
     javacopts = DOCLINT_HTML_AND_SYNTAX,
     srcs = glob(["org/apache/heron/api/**/*.java", 
"org/apache/heron/streamlet/**/*.java"]),
-    deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
+    deps = api_deps_files + [
+        "//third_party/java:kryo-neverlink",
+        "@org_apache_commons_commons_lang3//jar"
+    ]
 )
 
 java_binary(
     name = "api-unshaded",
     srcs = glob(["org/apache/heron/api/**/*.java", 
"org/apache/heron/streamlet/**/*.java"]),
-    deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
+    deps = api_deps_files + [
+        "//third_party/java:kryo-neverlink",
+        "@org_apache_commons_commons_lang3//jar"
+    ]
 )
 
 jarjar_binary(
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 61715e8528..029b49e796 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -30,7 +30,8 @@
 import org.apache.heron.streamlet.SerializableSupplier;
 import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
-import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
 
 /**
  * BuilderImpl implements the Builder interface.
@@ -46,7 +47,6 @@ public BuilderImpl() {
 
   @Override
   public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
-    StreamletUtils.require(supplier != null, "supplier must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
     sources.add(retval);
     return retval;
@@ -54,7 +54,6 @@ public BuilderImpl() {
 
   @Override
   public <R> Streamlet<R> newSource(Source<R> generator) {
-    StreamletUtils.require(generator != null, "source must not be null.");
     StreamletImpl<R> retval = 
StreamletImpl.createGeneratorStreamlet(generator);
     sources.add(retval);
     return retval;
@@ -62,7 +61,6 @@ public BuilderImpl() {
 
   @Override
   public <R> Streamlet<R> newSource(IRichSpout spout) {
-    StreamletUtils.require(spout != null, "spout must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
     sources.add(retval);
     return retval;
@@ -78,6 +76,8 @@ public TopologyBuilder build() {
   }
 
   public TopologyBuilder build(TopologyBuilder builder) {
+    checkNotNull(builder, "builder cannot not be null");
+
     Set<String> stageNames = new HashSet<>();
     for (StreamletImpl<?> streamlet : sources) {
       streamlet.build(builder, stageNames);
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index a05040b6a9..f382bcd4cf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl;
 
 import java.util.ArrayList;
@@ -59,7 +58,10 @@
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
-import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
 
 /**
  * A Streamlet is a (potentially unbounded) ordered collection of tuples.
@@ -151,8 +153,8 @@ public String toString() {
    */
   @Override
   public Streamlet<R> setName(String sName) {
-    StreamletUtils.require(sName != null && !sName.trim().isEmpty(),
-        "Streamlet name cannot be null/blank");
+    checkNotBlank(sName, "Streamlet name cannot be null/blank");
+
     this.name = sName;
     return this;
   }
@@ -190,8 +192,8 @@ protected void setDefaultNameIfNone(StreamletNamePrefix 
prefix, Set<String> stag
    */
   @Override
   public Streamlet<R> setNumPartitions(int numPartitions) {
-    StreamletUtils.require(numPartitions > 0,
-        "Streamlet's partitions number should be > 0");
+    require(numPartitions > 0, "Streamlet's partitions number should be > 0");
+
     this.nPartitions = numPartitions;
     return this;
   }
@@ -255,6 +257,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    * @param supplier The Supplier function to generate the elements
    */
   static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> 
supplier) {
+    checkNotNull(supplier, "supplier cannot not be null");
+
     return new SupplierStreamlet<T>(supplier);
   }
 
@@ -263,6 +267,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    * @param generator The Generator function to generate the elements
    */
   static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
+    checkNotNull(generator, "generator cannot not be null");
+
     return new SourceStreamlet<T>(generator);
   }
 
@@ -271,6 +277,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    * @param spout The Spout function to generate the elements
    */
   static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
+    checkNotNull(spout, "spout cannot not be null");
+
     return new SpoutStreamlet<T>(spout);
   }
 
@@ -280,6 +288,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
   */
   @Override
   public <T> Streamlet<T> map(SerializableFunction<R, ? extends T> mapFn) {
+    checkNotNull(mapFn, "mapFn cannot be null");
+
     MapStreamlet<R, T> retval = new MapStreamlet<>(this, mapFn);
     addChild(retval);
     return retval;
@@ -293,6 +303,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
   @Override
   public <T> Streamlet<T> flatMap(
       SerializableFunction<R, ? extends Iterable<? extends T>> flatMapFn) {
+    checkNotNull(flatMapFn, "flatMapFn cannot be null");
+
     FlatMapStreamlet<R, T> retval = new FlatMapStreamlet<>(this, flatMapFn);
     addChild(retval);
     return retval;
@@ -305,6 +317,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
   */
   @Override
   public Streamlet<R> filter(SerializablePredicate<R> filterFn) {
+    checkNotNull(filterFn, "filterFn cannot be null");
+
     FilterStreamlet<R> retval = new FilterStreamlet<>(this, filterFn);
     addChild(retval);
     return retval;
@@ -325,6 +339,8 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
   @Override
   public Streamlet<R> repartition(int numPartitions,
                            SerializableBiFunction<R, Integer, List<Integer>> 
partitionFn) {
+    checkNotNull(partitionFn, "partitionFn cannot be null");
+
     RemapStreamlet<R> retval = new RemapStreamlet<>(this, partitionFn);
     retval.setNumPartitions(numPartitions);
     addChild(retval);
@@ -338,7 +354,7 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    */
   @Override
   public List<Streamlet<R>> clone(int numClones) {
-    StreamletUtils.require(numClones > 0,
+    require(numClones > 0,
         "Streamlet's clone number should be > 0");
     List<Streamlet<R>> retval = new ArrayList<>(numClones);
     for (int i = 0; i < numClones; ++i) {
@@ -352,7 +368,7 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    * The join is done over elements accumulated over a time window defined by 
windowCfg.
    * The elements are compared using the thisKeyExtractor for this streamlet 
with the
    * otherKeyExtractor for the other streamlet. On each matching pair, the 
joinFunction is applied.
-   * @param other The Streamlet that we are joining with.
+   * @param otherStreamlet The Streamlet that we are joining with.
    * @param thisKeyExtractor The function applied to a tuple of this streamlet 
to get the key
    * @param otherKeyExtractor The function applied to a tuple of the other 
streamlet to get the key
    * @param windowCfg This is a specification of what kind of windowing 
strategy you like to
@@ -361,10 +377,16 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    */
   @Override
   public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
-        join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
+        join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> 
thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig 
windowCfg,
              SerializableBiFunction<R, S, ? extends T> joinFunction) {
-    return join(other, thisKeyExtractor, otherKeyExtractor,
+    checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+    checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
+    checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(joinFunction, "joinFunction cannot be null");
+
+    return join(otherStreamlet, thisKeyExtractor, otherKeyExtractor,
         windowCfg, JoinType.INNER, joinFunction);
   }
 
@@ -375,7 +397,7 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    * The elements are compared using the thisKeyExtractor for this streamlet 
with the
    * otherKeyExtractor for the other streamlet. On each matching pair, the 
joinFunction is applied.
    * Types of joins {@link JoinType}
-   * @param other The Streamlet that we are joining with.
+   * @param otherStreamlet The Streamlet that we are joining with.
    * @param thisKeyExtractor The function applied to a tuple of this streamlet 
to get the key
    * @param otherKeyExtractor The function applied to a tuple of the other 
streamlet to get the key
    * @param windowCfg This is a specification of what kind of windowing 
strategy you like to
@@ -385,11 +407,17 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    */
   @Override
   public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
-        join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
+        join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> 
thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig 
windowCfg,
              JoinType joinType, SerializableBiFunction<R, S, ? extends T> 
joinFunction) {
-
-    StreamletImpl<S> joinee = (StreamletImpl<S>) other;
+    checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+    checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
+    checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(joinType, "joinType cannot be null");
+    checkNotNull(joinFunction, "joinFunction cannot be null");
+
+    StreamletImpl<S> joinee = (StreamletImpl<S>) otherStreamlet;
     JoinStreamlet<K, R, S, T> retval = JoinStreamlet.createJoinStreamlet(
         this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, 
joinType, joinFunction);
     addChild(retval);
@@ -411,6 +439,11 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
   public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> 
valueExtractor,
       WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(valueExtractor, "valueExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(reduceFn, "reduceFn cannot be null");
+
     ReduceByKeyAndWindowStreamlet<K, V, R> retval =
         new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
             windowCfg, reduceFn);
@@ -435,6 +468,11 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
   public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
       T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(identity, "identity cannot be null");
+    checkNotNull(reduceFn, "reduceFn cannot be null");
+
     GeneralReduceByKeyAndWindowStreamlet<K, R, T> retval =
         new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, 
windowCfg,
             identity, reduceFn);
@@ -447,8 +485,10 @@ private String defaultNameCalculator(StreamletNamePrefix 
prefix, Set<String> sta
    * the new streamlet will contain tuples belonging to both Streamlets
   */
   @Override
-  public Streamlet<R> union(Streamlet<? extends R> other) {
-    StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) other;
+  public Streamlet<R> union(Streamlet<? extends R> otherStreamlet) {
+    checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+
+    StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) 
otherStreamlet;
     UnionStreamlet<R> retval = new UnionStreamlet<>(this, joinee);
     addChild(retval);
     joinee.addChild(retval);
@@ -472,6 +512,8 @@ public void log() {
    */
   @Override
   public void consume(SerializableConsumer<R> consumer) {
+    checkNotNull(consumer, "consumer cannot be null");
+
     ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this, 
consumer);
     addChild(consumerStreamlet);
   }
@@ -482,6 +524,8 @@ public void consume(SerializableConsumer<R> consumer) {
    */
   @Override
   public void toSink(Sink<R> sink) {
+    checkNotNull(sink, "sink cannot be null");
+
     SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
     addChild(sinkStreamlet);
   }
@@ -497,6 +541,8 @@ public void toSink(Sink<R> sink) {
   @Override
   public <T> Streamlet<T> transform(
       SerializableTransformer<R, ? extends T> serializableTransformer) {
+    checkNotNull(serializableTransformer, "serializableTransformer cannot be 
null");
+
     TransformStreamlet<R, T> transformStreamlet =
         new TransformStreamlet<>(this, serializableTransformer);
     addChild(transformStreamlet);
@@ -511,6 +557,8 @@ public void toSink(Sink<R> sink) {
    */
   @Override
   public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
+    checkNotNull(operator, "operator cannot be null");
+
     StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
     addChild(customStreamlet);
     return customStreamlet;
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index b0f71c0050..dbeea14a2e 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.streamlets;
 
 import java.util.Set;
@@ -40,7 +39,7 @@
   /**
    * Create a custom streamlet from user defined CustomOperator object.
    * @param parent The parent(upstream) streamlet object
-   * @param operator The user defined CustomeOperator
+   * @param operator The user defined CustomOperator
    */
   public CustomStreamlet(StreamletImpl<R> parent,
                          IStreamletOperator<R, T> operator) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
index 79104394c7..76792fcabe 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.heron.streamlet.impl.utils;
 
+import org.apache.commons.lang3.StringUtils;
+
 public final class StreamletUtils {
 
   private StreamletUtils() {
@@ -36,4 +38,31 @@ public static void require(Boolean requirement, String 
errorMessage) {
     }
   }
 
+  /**
+   * Verifies not blank text as the utility function.
+   * @param text The text to verify
+   * @param errorMessage The error message
+   * @throws IllegalArgumentException if the requirement fails
+   */
+  public static String checkNotBlank(String text, String errorMessage) {
+    if (StringUtils.isBlank(text)) {
+      throw new IllegalArgumentException(errorMessage);
+    } else {
+      return text;
+    }
+  }
+
+  /**
+   * Verifies not null reference as the utility function.
+   * @param reference The reference to verify
+   * @param errorMessage The error message
+   * @throws NullPointerException if the requirement fails
+   */
+  public static <T> T checkNotNull(T reference, String errorMessage) {
+    if (reference == null) {
+      throw new NullPointerException(errorMessage);
+    }
+    return reference;
+  }
+
 }
diff --git 
a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
 
b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
index c1b93ce478..9f13c1670d 100644
--- 
a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
+++ 
b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
@@ -21,18 +21,53 @@
 
 import org.junit.Test;
 
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
+
 public class StreamletUtilsTest {
 
   @Test
   public void testRequire() {
     String text = "test_text";
-    StreamletUtils.require(!text.isEmpty(), "text should not be blank");
+    require(!text.isEmpty(), "text should not be blank");
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testRequireWithNegativeCase() {
     String text = "";
-    StreamletUtils.require(!text.isEmpty(), "text should not be blank");
+    require(!text.isEmpty(), "text should not be blank");
+  }
+
+  @Test
+  public void testCheckNotBlank() {
+    checkNotBlank("test_text", "text should not be blank");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckNotBlankWithNullReference() {
+    checkNotBlank(null, "text should not be blank");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckNotBlankWithEmptyString() {
+    checkNotBlank("", "text should not be blank");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckNotBlankWithBlankString() {
+    checkNotBlank(" ", "text should not be blank");
+  }
+
+  @Test
+  public void testCheckNotNull() {
+    checkNotNull(new String(), "text should not be null");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckNotNullWithNullReference() {
+    String text = null;
+    checkNotNull(text, "text should not be null");
   }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to