This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new a81021a [Java Streamlet API] Extend Validations Part II (#3111)
a81021a is described below
commit a81021a763afe5bd5e36f90b46466cfd4736b9e6
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Mon Nov 19 16:05:37 2018 +0000
[Java Streamlet API] Extend Validations Part II (#3111)
* Extends Streamlet Java API Validations Part II
* Set otherStreamlet parameter name as more meaningful
---
heron/api/src/java/BUILD | 13 +++-
.../apache/heron/streamlet/impl/BuilderImpl.java | 8 +--
.../apache/heron/streamlet/impl/StreamletImpl.java | 80 +++++++++++++++++-----
.../streamlet/impl/streamlets/CustomStreamlet.java | 3 +-
.../heron/streamlet/impl/utils/StreamletUtils.java | 29 ++++++++
.../streamlet/impl/utils/StreamletUtilsTest.java | 39 ++++++++++-
6 files changed, 145 insertions(+), 27 deletions(-)
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index cdab255..0ae795f 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -33,7 +33,8 @@ java_library(
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files + [
":api-java-low-level",
- "//third_party/java:kryo-neverlink"
+ "//third_party/java:kryo-neverlink",
+ "@org_apache_commons_commons_lang3//jar"
]
)
@@ -42,13 +43,19 @@ java_library(
name = "api-java-low-level-functional",
javacopts = DOCLINT_HTML_AND_SYNTAX,
srcs = glob(["org/apache/heron/api/**/*.java",
"org/apache/heron/streamlet/**/*.java"]),
- deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
+ deps = api_deps_files + [
+ "//third_party/java:kryo-neverlink",
+ "@org_apache_commons_commons_lang3//jar"
+ ]
)
java_binary(
name = "api-unshaded",
srcs = glob(["org/apache/heron/api/**/*.java",
"org/apache/heron/streamlet/**/*.java"]),
- deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
+ deps = api_deps_files + [
+ "//third_party/java:kryo-neverlink",
+ "@org_apache_commons_commons_lang3//jar"
+ ]
)
jarjar_binary(
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 61715e8..029b49e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -30,7 +30,8 @@ import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
-import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+import static
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
/**
* BuilderImpl implements the Builder interface.
@@ -46,7 +47,6 @@ public final class BuilderImpl implements Builder {
@Override
public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
- StreamletUtils.require(supplier != null, "supplier must not be null.");
StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
sources.add(retval);
return retval;
@@ -54,7 +54,6 @@ public final class BuilderImpl implements Builder {
@Override
public <R> Streamlet<R> newSource(Source<R> generator) {
- StreamletUtils.require(generator != null, "source must not be null.");
StreamletImpl<R> retval =
StreamletImpl.createGeneratorStreamlet(generator);
sources.add(retval);
return retval;
@@ -62,7 +61,6 @@ public final class BuilderImpl implements Builder {
@Override
public <R> Streamlet<R> newSource(IRichSpout spout) {
- StreamletUtils.require(spout != null, "spout must not be null.");
StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
sources.add(retval);
return retval;
@@ -78,6 +76,8 @@ public final class BuilderImpl implements Builder {
}
public TopologyBuilder build(TopologyBuilder builder) {
+ checkNotNull(builder, "builder cannot not be null");
+
Set<String> stageNames = new HashSet<>();
for (StreamletImpl<?> streamlet : sources) {
streamlet.build(builder, stageNames);
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index a05040b..f382bcd 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.ArrayList;
@@ -59,7 +58,10 @@ import
org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
-import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+import static
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
@@ -151,8 +153,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public Streamlet<R> setName(String sName) {
- StreamletUtils.require(sName != null && !sName.trim().isEmpty(),
- "Streamlet name cannot be null/blank");
+ checkNotBlank(sName, "Streamlet name cannot be null/blank");
+
this.name = sName;
return this;
}
@@ -190,8 +192,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public Streamlet<R> setNumPartitions(int numPartitions) {
- StreamletUtils.require(numPartitions > 0,
- "Streamlet's partitions number should be > 0");
+ require(numPartitions > 0, "Streamlet's partitions number should be > 0");
+
this.nPartitions = numPartitions;
return this;
}
@@ -255,6 +257,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
* @param supplier The Supplier function to generate the elements
*/
static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T>
supplier) {
+ checkNotNull(supplier, "supplier cannot not be null");
+
return new SupplierStreamlet<T>(supplier);
}
@@ -263,6 +267,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
* @param generator The Generator function to generate the elements
*/
static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
+ checkNotNull(generator, "generator cannot not be null");
+
return new SourceStreamlet<T>(generator);
}
@@ -271,6 +277,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
* @param spout The Spout function to generate the elements
*/
static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
+ checkNotNull(spout, "spout cannot not be null");
+
return new SpoutStreamlet<T>(spout);
}
@@ -280,6 +288,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public <T> Streamlet<T> map(SerializableFunction<R, ? extends T> mapFn) {
+ checkNotNull(mapFn, "mapFn cannot be null");
+
MapStreamlet<R, T> retval = new MapStreamlet<>(this, mapFn);
addChild(retval);
return retval;
@@ -293,6 +303,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
@Override
public <T> Streamlet<T> flatMap(
SerializableFunction<R, ? extends Iterable<? extends T>> flatMapFn) {
+ checkNotNull(flatMapFn, "flatMapFn cannot be null");
+
FlatMapStreamlet<R, T> retval = new FlatMapStreamlet<>(this, flatMapFn);
addChild(retval);
return retval;
@@ -305,6 +317,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public Streamlet<R> filter(SerializablePredicate<R> filterFn) {
+ checkNotNull(filterFn, "filterFn cannot be null");
+
FilterStreamlet<R> retval = new FilterStreamlet<>(this, filterFn);
addChild(retval);
return retval;
@@ -325,6 +339,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
@Override
public Streamlet<R> repartition(int numPartitions,
SerializableBiFunction<R, Integer, List<Integer>>
partitionFn) {
+ checkNotNull(partitionFn, "partitionFn cannot be null");
+
RemapStreamlet<R> retval = new RemapStreamlet<>(this, partitionFn);
retval.setNumPartitions(numPartitions);
addChild(retval);
@@ -338,7 +354,7 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public List<Streamlet<R>> clone(int numClones) {
- StreamletUtils.require(numClones > 0,
+ require(numClones > 0,
"Streamlet's clone number should be > 0");
List<Streamlet<R>> retval = new ArrayList<>(numClones);
for (int i = 0; i < numClones; ++i) {
@@ -352,7 +368,7 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
* The join is done over elements accumulated over a time window defined by
windowCfg.
* The elements are compared using the thisKeyExtractor for this streamlet
with the
* otherKeyExtractor for the other streamlet. On each matching pair, the
joinFunction is applied.
- * @param other The Streamlet that we are joining with.
+ * @param otherStreamlet The Streamlet that we are joining with.
* @param thisKeyExtractor The function applied to a tuple of this streamlet
to get the key
* @param otherKeyExtractor The function applied to a tuple of the other
streamlet to get the key
* @param windowCfg This is a specification of what kind of windowing
strategy you like to
@@ -361,10 +377,16 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
- join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
+ join(Streamlet<S> otherStreamlet, SerializableFunction<R, K>
thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig
windowCfg,
SerializableBiFunction<R, S, ? extends T> joinFunction) {
- return join(other, thisKeyExtractor, otherKeyExtractor,
+ checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+ checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
+ checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
+ checkNotNull(windowCfg, "windowCfg cannot be null");
+ checkNotNull(joinFunction, "joinFunction cannot be null");
+
+ return join(otherStreamlet, thisKeyExtractor, otherKeyExtractor,
windowCfg, JoinType.INNER, joinFunction);
}
@@ -375,7 +397,7 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
* The elements are compared using the thisKeyExtractor for this streamlet
with the
* otherKeyExtractor for the other streamlet. On each matching pair, the
joinFunction is applied.
* Types of joins {@link JoinType}
- * @param other The Streamlet that we are joining with.
+ * @param otherStreamlet The Streamlet that we are joining with.
* @param thisKeyExtractor The function applied to a tuple of this streamlet
to get the key
* @param otherKeyExtractor The function applied to a tuple of the other
streamlet to get the key
* @param windowCfg This is a specification of what kind of windowing
strategy you like to
@@ -385,11 +407,17 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>>
- join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor,
+ join(Streamlet<S> otherStreamlet, SerializableFunction<R, K>
thisKeyExtractor,
SerializableFunction<S, K> otherKeyExtractor, WindowConfig
windowCfg,
JoinType joinType, SerializableBiFunction<R, S, ? extends T>
joinFunction) {
-
- StreamletImpl<S> joinee = (StreamletImpl<S>) other;
+ checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+ checkNotNull(thisKeyExtractor, "thisKeyExtractor cannot be null");
+ checkNotNull(otherKeyExtractor, "otherKeyExtractor cannot be null");
+ checkNotNull(windowCfg, "windowCfg cannot be null");
+ checkNotNull(joinType, "joinType cannot be null");
+ checkNotNull(joinFunction, "joinFunction cannot be null");
+
+ StreamletImpl<S> joinee = (StreamletImpl<S>) otherStreamlet;
JoinStreamlet<K, R, S, T> retval = JoinStreamlet.createJoinStreamlet(
this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg,
joinType, joinFunction);
addChild(retval);
@@ -411,6 +439,11 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V>
valueExtractor,
WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
+ checkNotNull(keyExtractor, "keyExtractor cannot be null");
+ checkNotNull(valueExtractor, "valueExtractor cannot be null");
+ checkNotNull(windowCfg, "windowCfg cannot be null");
+ checkNotNull(reduceFn, "reduceFn cannot be null");
+
ReduceByKeyAndWindowStreamlet<K, V, R> retval =
new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
windowCfg, reduceFn);
@@ -435,6 +468,11 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg,
T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
+ checkNotNull(keyExtractor, "keyExtractor cannot be null");
+ checkNotNull(windowCfg, "windowCfg cannot be null");
+ checkNotNull(identity, "identity cannot be null");
+ checkNotNull(reduceFn, "reduceFn cannot be null");
+
GeneralReduceByKeyAndWindowStreamlet<K, R, T> retval =
new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor,
windowCfg,
identity, reduceFn);
@@ -447,8 +485,10 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
* the new streamlet will contain tuples belonging to both Streamlets
*/
@Override
- public Streamlet<R> union(Streamlet<? extends R> other) {
- StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>) other;
+ public Streamlet<R> union(Streamlet<? extends R> otherStreamlet) {
+ checkNotNull(otherStreamlet, "otherStreamlet cannot be null");
+
+ StreamletImpl<? extends R> joinee = (StreamletImpl<? extends R>)
otherStreamlet;
UnionStreamlet<R> retval = new UnionStreamlet<>(this, joinee);
addChild(retval);
joinee.addChild(retval);
@@ -472,6 +512,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public void consume(SerializableConsumer<R> consumer) {
+ checkNotNull(consumer, "consumer cannot be null");
+
ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<>(this,
consumer);
addChild(consumerStreamlet);
}
@@ -482,6 +524,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public void toSink(Sink<R> sink) {
+ checkNotNull(sink, "sink cannot be null");
+
SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<>(this, sink);
addChild(sinkStreamlet);
}
@@ -497,6 +541,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
@Override
public <T> Streamlet<T> transform(
SerializableTransformer<R, ? extends T> serializableTransformer) {
+ checkNotNull(serializableTransformer, "serializableTransformer cannot be
null");
+
TransformStreamlet<R, T> transformStreamlet =
new TransformStreamlet<>(this, serializableTransformer);
addChild(transformStreamlet);
@@ -511,6 +557,8 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
+ checkNotNull(operator, "operator cannot be null");
+
StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
addChild(customStreamlet);
return customStreamlet;
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index b0f71c0..dbeea14 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
@@ -40,7 +39,7 @@ public class CustomStreamlet<R, T> extends StreamletImpl<T> {
/**
* Create a custom streamlet from user defined CustomOperator object.
* @param parent The parent(upstream) streamlet object
- * @param operator The user defined CustomeOperator
+ * @param operator The user defined CustomOperator
*/
public CustomStreamlet(StreamletImpl<R> parent,
IStreamletOperator<R, T> operator) {
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
index 7910439..76792fc 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/utils/StreamletUtils.java
@@ -19,6 +19,8 @@
package org.apache.heron.streamlet.impl.utils;
+import org.apache.commons.lang3.StringUtils;
+
public final class StreamletUtils {
private StreamletUtils() {
@@ -36,4 +38,31 @@ public final class StreamletUtils {
}
}
+ /**
+ * Verifies not blank text as the utility function.
+ * @param text The text to verify
+ * @param errorMessage The error message
+ * @throws IllegalArgumentException if the requirement fails
+ */
+ public static String checkNotBlank(String text, String errorMessage) {
+ if (StringUtils.isBlank(text)) {
+ throw new IllegalArgumentException(errorMessage);
+ } else {
+ return text;
+ }
+ }
+
+ /**
+ * Verifies not null reference as the utility function.
+ * @param reference The reference to verify
+ * @param errorMessage The error message
+ * @throws NullPointerException if the requirement fails
+ */
+ public static <T> T checkNotNull(T reference, String errorMessage) {
+ if (reference == null) {
+ throw new NullPointerException(errorMessage);
+ }
+ return reference;
+ }
+
}
diff --git
a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
index c1b93ce..9f13c16 100644
---
a/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
+++
b/heron/api/tests/java/org/apache/heron/streamlet/impl/utils/StreamletUtilsTest.java
@@ -21,18 +21,53 @@ package org.apache.heron.streamlet.impl.utils;
import org.junit.Test;
+import static
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
+import static
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
+import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
+
public class StreamletUtilsTest {
@Test
public void testRequire() {
String text = "test_text";
- StreamletUtils.require(!text.isEmpty(), "text should not be blank");
+ require(!text.isEmpty(), "text should not be blank");
}
@Test(expected = IllegalArgumentException.class)
public void testRequireWithNegativeCase() {
String text = "";
- StreamletUtils.require(!text.isEmpty(), "text should not be blank");
+ require(!text.isEmpty(), "text should not be blank");
+ }
+
+ @Test
+ public void testCheckNotBlank() {
+ checkNotBlank("test_text", "text should not be blank");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCheckNotBlankWithNullReference() {
+ checkNotBlank(null, "text should not be blank");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCheckNotBlankWithEmptyString() {
+ checkNotBlank("", "text should not be blank");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCheckNotBlankWithBlankString() {
+ checkNotBlank(" ", "text should not be blank");
+ }
+
+ @Test
+ public void testCheckNotNull() {
+ checkNotNull(new String(), "text should not be null");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCheckNotNullWithNullReference() {
+ String text = null;
+ checkNotNull(text, "text should not be null");
}
}