This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new a81021a  [Java Streamlet API] Extend Validations Part II (#3111)
a81021a is described below

commit a81021a763afe5bd5e36f90b46466cfd4736b9e6
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Mon Nov 19 16:05:37 2018 +0000

    [Java Streamlet API] Extend Validations Part II (#3111)
    
    * Extends Streamlet Java API Validations Part II
    
    * Set otherStreamlet parameter name as more meaningful
---
 heron/api/src/java/BUILD                           | 13 +++-
 .../apache/heron/streamlet/impl/BuilderImpl.java   |  8 +--
 .../apache/heron/streamlet/impl/StreamletImpl.java | 80 +++++++++++++++++-----
 .../streamlet/impl/streamlets/CustomStreamlet.java |  3 +-
 .../heron/streamlet/impl/utils/StreamletUtils.java | 29 ++++++++
 .../streamlet/impl/utils/StreamletUtilsTest.java   | 39 ++++++++++-
 6 files changed, 145 insertions(+), 27 deletions(-)

diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index cdab255..0ae795f 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -33,7 +33,8 @@ java_library(
     javacopts = DOCLINT_HTML_AND_SYNTAX,
     deps = api_deps_files + [
         ":api-java-low-level",
-        "//third_party/java:kryo-neverlink"
+        "//third_party/java:kryo-neverlink",
+        "@org_apache_commons_commons_lang3//jar"
     ]
 )
 
@@ -42,13 +43,19 @@ java_library(
     name = "api-java-low-level-functional",
     javacopts = DOCLINT_HTML_AND_SYNTAX,
     srcs = glob(["org/apache/heron/api/**/*.java", 
"org/apache/heron/streamlet/**/*.java"]),
-    deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
+    deps = api_deps_files + [
+        "//third_party/java:kryo-neverlink",
+        "@org_apache_commons_commons_lang3//jar"
+    ]
 )
 
 java_binary(
     name = "api-unshaded",
     srcs = glob(["org/apache/heron/api/**/*.java", 
"org/apache/heron/streamlet/**/*.java"]),
-    deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
+    deps = api_deps_files + [
+        "//third_party/java:kryo-neverlink",
+        "@org_apache_commons_commons_lang3//jar"
+    ]
 )
 
 jarjar_binary(
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 61715e8..029b49e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -30,7 +30,8 @@ import org.apache.heron.streamlet.Builder;
 import org.apache.heron.streamlet.SerializableSupplier;
 import org.apache.heron.streamlet.Source;
 import org.apache.heron.streamlet.Streamlet;
-import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
 
 /**
  * BuilderImpl implements the Builder interface.
@@ -46,7 +47,6 @@ public final class BuilderImpl implements Builder {
 
   @Override
   public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
-    StreamletUtils.require(supplier != null, "supplier must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
     sources.add(retval);
     return retval;
@@ -54,7 +54,6 @@ public final class BuilderImpl implements Builder {
 
   @Override
   public <R> Streamlet<R> newSource(Source<R> generator) {
-    StreamletUtils.require(generator != null, "source must not be null.");
     StreamletImpl<R> retval = 
StreamletImpl.createGeneratorStreamlet(generator);
     sources.add(retval);
     return retval;
@@ -62,7 +61,6 @@ public final class BuilderImpl implements Builder {
 
   @Override
   public <R> Streamlet<R> newSource(IRichSpout spout) {
-    StreamletUtils.require(spout != null, "spout must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
     sources.add(retval);
     return retval;
@@ -78,6 +76,8 @@ public final class BuilderImpl implements Builder {
   }
 
   public TopologyBuilder build(TopologyBuilder builder) {
+    checkNotNull(builder, "builder cannot not be null");
+
     Set<String> stageNames = new HashSet<>();
     for (StreamletImpl<?> streamlet : sources) {
       streamlet.build(builder, stageNames);
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index a05040b..f382bcd 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl;
 
 import java.util.ArrayList;
@@ -59,7 +58,10 @@ import 
org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
-import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
 
 /**
  * A Streamlet is a (potentially unbounded) ordered collection of tuples.
@@ -151,8 +153,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public Streamlet<R> setName(String sName) {
-    StreamletUtils.require(sName != null && !sName.trim().isEmpty(),
-        "Streamlet name cannot be null/blank");
+    checkNotBlank(sName, "Streamlet name cannot be null/blank");
+
     this.name = sName;
     return this;
   }
@@ -190,8 +192,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public Streamlet<R> setNumPartitions(int numPartitions) {
-    StreamletUtils.require(numPartitions > 0,
-        "Streamlet's partitions number should be > 0");
+    require(numPartitions > 0, "Streamlet's partitions number should be > 0");
+
     this.nPartitions = numPartitions;
     return this;
   }
@@ -255,6 +257,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    * @param supplier The Supplier function to generate the elements
    */
   static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> 
supplier) {
+    checkNotNull(supplier, "supplier cannot not be null");
+
     return new SupplierStreamlet<T>(supplier);
   }
 
@@ -263,6 +267,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    * @param generator The Generator function to generate the elements
    */
   static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
+    checkNotNull(generator, "generator cannot not be null");
+
     return new SourceStreamlet<T>(generator);
   }
 
@@ -271,6 +277,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    * @param spout The Spout function to generate the elements
    */
   static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
+    checkNotNull(spout, "spout cannot not be null");
+
     return new SpoutStreamlet<T>(spout);
   }
 
@@ -280,6 +288,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   */
   @Override
   public <T> Streamlet<T> map(SerializableFunction<R, ? extends T> mapFn) {
+    checkNotNull(mapFn, "mapFn cannot be null");
+
     MapStreamlet<R, T> retval = new MapStreamlet<>(this, mapFn);
     addChild(retval);
     return retval;
@@ -293,6 +303,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   @Override
   public <T> Streamlet<T> flatMap(
       SerializableFunction<R, ? extends Iterable<? extends T>> flatMapFn) {
+    checkNotNull(flatMapFn, "flatMapFn cannot be null");
+
     FlatMapStreamlet<R, T> retval = new FlatMapStreamlet<>(this, flatMapFn);
     addChild(retval);
     return retval;
@@ -305,6 +317,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   */
   @Override
   public Streamlet<R> filter(SerializablePredicate<R> filterFn) {
+    checkNotNull(filterFn, "filterFn cannot be null");
+
     FilterStreamlet<R> retval = new FilterStreamlet<>(this, filterFn);
     addChild(retval);
     return retval;
@@ -325,6 +339,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   @Override
   public Streamlet<R> repartition(int numPartitions,
                            SerializableBiFunction<R, Integer, List<Integer>> 
partitionFn) {
+    checkNotNull(partitionFn, "partitionFn cannot be null");
+
     RemapStreamlet<R> retval = new RemapStreamlet<>(this, partitionFn);
     retval.setNumPartitions(numPartitions);
     addChild(retval);
@@ -338,7 +354,7 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public List<Streamlet<R>> clone(int numClones) {
-    StreamletUtils.require(numClones > 0,
+    require(numClones > 0,
         "Streamlet's clone number should be > 0");
     List<Streamlet<R>> retval = new ArrayList<>(numClones);
     for (int i = 0; i < numClones; ++i) {
@@ -352,7 +368,7 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    * The join is done over elements accumulated over a time window defined by 
windowCfg.
    * The elements are compared using the thisKeyExtractor for this streamlet 
with the
    * otherKeyExtractor for the other streamlet. On each matching pair, the 
joinFunction is applied.
-   * @param other The Streamlet that we are joining with.
+   * @param otherStreamlet The Streamlet that we are joining with.
    * @param thisKeyExtractor The function applied to a tuple of this streamlet 
to get the key
    * @param otherKeyExtractor The function applied to a tuple of the other 
streamlet to get the key
    * @param windowCfg This is a specification of what kind of windowing 
strategy you like to
@@ -361,10 +377,16 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
-        join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
+        join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> 
thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig 
windowCfg,
              SerializableBiFunction<R, S, ? extends T> joinFunction) {
-    return join(other, thisKeyExtractor, otherKeyExtractor,
+    checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+    checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
+    checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(joinFunction, "joinFunction cannot be null");
+
+    return join(otherStreamlet, thisKeyExtractor, otherKeyExtractor,
         windowCfg, JoinType.INNER, joinFunction);
   }
 
@@ -375,7 +397,7 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    * The elements are compared using the thisKeyExtractor for this streamlet 
with the
    * otherKeyExtractor for the other streamlet. On each matching pair, the 
joinFunction is applied.
    * Types of joins {@link JoinType}
-   * @param other The Streamlet that we are joining with.
+   * @param otherStreamlet The Streamlet that we are joining with.
    * @param thisKeyExtractor The function applied to a tuple of this streamlet 
to get the key
    * @param otherKeyExtractor The function applied to a tuple of the other 
streamlet to get the key
    * @param windowCfg This is a specification of what kind of windowing 
strategy you like to
@@ -385,11 +407,17 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
-        join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
+        join(Streamlet<S> otherStreamlet, SerializableFunction<R, K> 
thisKeyExtractor,
              SerializableFunction<S, K> otherKeyExtractor, WindowConfig 
windowCfg,
              JoinType joinType, SerializableBiFunction<R, S, ? extends T> 
joinFunction) {
-
-    StreamletImpl<S> joinee = (StreamletImpl<S>) other;
+    checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+    checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
+    checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(joinType, "joinType cannot be null");
+    checkNotNull(joinFunction, "joinFunction cannot be null");
+
+    StreamletImpl<S> joinee = (StreamletImpl<S>) otherStreamlet;
     JoinStreamlet<K, R, S, T> retval = JoinStreamlet.createJoinStreamlet(
         this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, 
joinType, joinFunction);
     addChild(retval);
@@ -411,6 +439,11 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> 
valueExtractor,
       WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(valueExtractor, "valueExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(reduceFn, "reduceFn cannot be null");
+
     ReduceByKeyAndWindowStreamlet<K, V, R> retval =
         new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
             windowCfg, reduceFn);
@@ -435,6 +468,11 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
       SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
       T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
+    checkNotNull(keyExtractor, "keyExtractor cannot be null");
+    checkNotNull(windowCfg, "windowCfg cannot be null");
+    checkNotNull(identity, "identity cannot be null");
+    checkNotNull(reduceFn, "reduceFn cannot be null");
+
     GeneralReduceByKeyAndWindowStreamlet<K, R, T> retval =
         new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, 
windowCfg,
             identity, reduceFn);
@@ -447,8 +485,10 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    * the new streamlet will contain tuples belonging to both Streamlets
   */
   @Override
-  public Streamlet<R> union(Streamlet<? extends R> other) {
-    StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) other;
+  public Streamlet<R> union(Streamlet<? extends R> otherStreamlet) {
+    checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+
+    StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) 
otherStreamlet;
     UnionStreamlet<R> retval = new UnionStreamlet<>(this, joinee);
     addChild(retval);
     joinee.addChild(retval);
@@ -472,6 +512,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public void consume(SerializableConsumer<R> consumer) {
+    checkNotNull(consumer, "consumer cannot be null");
+
     ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this, 
consumer);
     addChild(consumerStreamlet);
   }
@@ -482,6 +524,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public void toSink(Sink<R> sink) {
+    checkNotNull(sink, "sink cannot be null");
+
     SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
     addChild(sinkStreamlet);
   }
@@ -497,6 +541,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   @Override
   public <T> Streamlet<T> transform(
       SerializableTransformer<R, ? extends T> serializableTransformer) {
+    checkNotNull(serializableTransformer, "serializableTransformer cannot be 
null");
+
     TransformStreamlet<R, T> transformStreamlet =
         new TransformStreamlet<>(this, serializableTransformer);
     addChild(transformStreamlet);
@@ -511,6 +557,8 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
+    checkNotNull(operator, "operator cannot be null");
+
     StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
     addChild(customStreamlet);
     return customStreamlet;
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index b0f71c0..dbeea14 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.streamlets;
 
 import java.util.Set;
@@ -40,7 +39,7 @@ public class CustomStreamlet<R, T> extends StreamletImpl<T> {
   /**
    * Create a custom streamlet from user defined CustomOperator object.
    * @param parent The parent(upstream) streamlet object
-   * @param operator The user defined CustomeOperator
+   * @param operator The user defined CustomOperator
    */
   public CustomStreamlet(StreamletImpl<R> parent,
                          IStreamletOperator<R, T> operator) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
index 7910439..76792fc 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.heron.streamlet.impl.utils;
 
+import org.apache.commons.lang3.StringUtils;
+
 public final class StreamletUtils {
 
   private StreamletUtils() {
@@ -36,4 +38,31 @@ public final class StreamletUtils {
     }
   }
 
+  /**
+   * Verifies not blank text as the utility function.
+   * @param text The text to verify
+   * @param errorMessage The error message
+   * @throws IllegalArgumentException if the requirement fails
+   */
+  public static String checkNotBlank(String text, String errorMessage) {
+    if (StringUtils.isBlank(text)) {
+      throw new IllegalArgumentException(errorMessage);
+    } else {
+      return text;
+    }
+  }
+
+  /**
+   * Verifies not null reference as the utility function.
+   * @param reference The reference to verify
+   * @param errorMessage The error message
+   * @throws NullPointerException if the requirement fails
+   */
+  public static <T> T checkNotNull(T reference, String errorMessage) {
+    if (reference == null) {
+      throw new NullPointerException(errorMessage);
+    }
+    return reference;
+  }
+
 }
diff --git 
a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
 
b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
index c1b93ce..9f13c16 100644
--- 
a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
+++ 
b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
@@ -21,18 +21,53 @@ package org.apache.heron.streamlet.impl.utils;
 
 import org.junit.Test;
 
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static 
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
+
 public class StreamletUtilsTest {
 
   @Test
   public void testRequire() {
     String text = "test_text";
-    StreamletUtils.require(!text.isEmpty(), "text should not be blank");
+    require(!text.isEmpty(), "text should not be blank");
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testRequireWithNegativeCase() {
     String text = "";
-    StreamletUtils.require(!text.isEmpty(), "text should not be blank");
+    require(!text.isEmpty(), "text should not be blank");
+  }
+
+  @Test
+  public void testCheckNotBlank() {
+    checkNotBlank("test_text", "text should not be blank");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckNotBlankWithNullReference() {
+    checkNotBlank(null, "text should not be blank");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckNotBlankWithEmptyString() {
+    checkNotBlank("", "text should not be blank");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckNotBlankWithBlankString() {
+    checkNotBlank(" ", "text should not be blank");
+  }
+
+  @Test
+  public void testCheckNotNull() {
+    checkNotNull(new String(), "text should not be null");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckNotNullWithNullReference() {
+    String text = null;
+    checkNotNull(text, "text should not be null");
   }
 
 }

Reply via email to