nwangtw closed pull request #3115: [Java Streamlet API] Move Source Logics to
Builder
URL: https://github.com/apache/incubator-heron/pull/3115
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 029b49e796..853bb972f7 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.HashSet;
@@ -30,6 +29,9 @@
import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
+import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import static
org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotNull;
@@ -47,21 +49,27 @@ public BuilderImpl() {
@Override
public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
- StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
+ checkNotNull(supplier, "supplier cannot not be null");
+
+ StreamletImpl<R> retval = new SupplierStreamlet<>(supplier);
sources.add(retval);
return retval;
}
@Override
public <R> Streamlet<R> newSource(Source<R> generator) {
- StreamletImpl<R> retval =
StreamletImpl.createGeneratorStreamlet(generator);
+ checkNotNull(generator, "generator cannot not be null");
+
+ StreamletImpl<R> retval = new SourceStreamlet<>(generator);
sources.add(retval);
return retval;
}
@Override
public <R> Streamlet<R> newSource(IRichSpout spout) {
- StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
+ checkNotNull(spout, "spout cannot not be null");
+
+ StreamletImpl<R> retval = new SpoutStreamlet<>(spout);
sources.add(retval);
return retval;
}
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index f382bcd4cf..7abf9e2cf8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.ArrayList;
@@ -25,7 +24,6 @@
import java.util.Set;
import java.util.logging.Logger;
-import org.apache.heron.api.spout.IRichSpout;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
@@ -36,10 +34,8 @@
import org.apache.heron.streamlet.SerializableConsumer;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.SerializablePredicate;
-import org.apache.heron.streamlet.SerializableSupplier;
import org.apache.heron.streamlet.SerializableTransformer;
import org.apache.heron.streamlet.Sink;
-import org.apache.heron.streamlet.Source;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet;
@@ -53,9 +49,6 @@
import
org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
-import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -252,36 +245,6 @@ private String defaultNameCalculator(StreamletNamePrefix
prefix, Set<String> sta
return calculatedName;
}
- /**
- * Create a Streamlet based on the supplier function
- * @param supplier The Supplier function to generate the elements
- */
- static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T>
supplier) {
- checkNotNull(supplier, "supplier cannot not be null");
-
- return new SupplierStreamlet<T>(supplier);
- }
-
- /**
- * Create a Streamlet based on the generator function
- * @param generator The Generator function to generate the elements
- */
- static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
- checkNotNull(generator, "generator cannot not be null");
-
- return new SourceStreamlet<T>(generator);
- }
-
- /**
- * Create a Streamlet based on a Spout object
- * @param spout The Spout function to generate the elements
- */
- static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
- checkNotNull(spout, "spout cannot not be null");
-
- return new SpoutStreamlet<T>(spout);
- }
-
/**
* Return a new Streamlet by applying mapFn to each element of this Streamlet
* @param mapFn The Map Function that should be applied to each element
diff --git
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index a9b15bdaa3..2237bcebc4 100644
---
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
import java.util.Arrays;
@@ -34,6 +33,7 @@
import org.apache.heron.resource.TestBolt;
import org.apache.heron.resource.TestSpout;
import org.apache.heron.resource.TestWindowBolt;
+import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Context;
import org.apache.heron.streamlet.IStreamletBasicOperator;
@@ -65,9 +65,11 @@
*/
public class StreamletImplTest {
+ private Builder builder = Builder.newBuilder();
+
@Test
- public void testBasicParams() throws Exception {
- Streamlet<Double> sample = StreamletImpl.createSupplierStreamlet(() ->
Math.random());
+ public void testBasicParams() {
+ Streamlet<Double> sample = builder.newSource(() -> Math.random());
sample.setName("MyStreamlet");
sample.setNumPartitions(20);
assertEquals("MyStreamlet", sample.getName());
@@ -82,22 +84,22 @@ public void testBasicParams() throws Exception {
}
@Test
- public void testSupplierStreamlet() throws Exception {
- Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() ->
Math.random());
+ public void testSupplierStreamlet() {
+ Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
assertTrue(streamlet instanceof SupplierStreamlet);
}
@Test
- public void testSpoutStreamlet() throws Exception {
+ public void testSpoutStreamlet() {
TestSpout spout = new TestSpout();
- Streamlet<Double> streamlet = StreamletImpl.createSpoutStreamlet(spout);
+ Streamlet<Double> streamlet = builder.newSource(spout);
assertTrue(streamlet instanceof SpoutStreamlet);
}
@Test
@SuppressWarnings("unchecked")
- public void testMapStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testMapStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20).map((num)
-> num * 10);
assertTrue(streamlet instanceof MapStreamlet);
MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>)
streamlet;
@@ -109,8 +111,8 @@ public void testMapStreamlet() throws Exception {
@Test
@SuppressWarnings("unchecked")
- public void testFlatMapStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testFlatMapStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.flatMap((num) ->
Arrays.asList(num * 10));
assertTrue(streamlet instanceof FlatMapStreamlet);
@@ -123,8 +125,8 @@ public void testFlatMapStreamlet() throws Exception {
@Test
@SuppressWarnings("unchecked")
- public void testFilterStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testFilterStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet =
baseStreamlet.setNumPartitions(20).filter((num) -> num != 0);
assertTrue(streamlet instanceof FilterStreamlet);
FilterStreamlet<Double> mStreamlet = (FilterStreamlet<Double>) streamlet;
@@ -136,8 +138,8 @@ public void testFilterStreamlet() throws Exception {
@Test
@SuppressWarnings("unchecked")
- public void testRepartitionStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testRepartitionStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet =
baseStreamlet.setNumPartitions(20).repartition(40);
assertTrue(streamlet instanceof MapStreamlet);
MapStreamlet<Double, Double> mStreamlet = (MapStreamlet<Double, Double>)
streamlet;
@@ -151,7 +153,7 @@ public void testRepartitionStreamlet() throws Exception {
@Test
@SuppressWarnings("unchecked")
public void testCloneStreamlet() {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
List<Streamlet<Double>> streamlets =
baseStreamlet.setNumPartitions(20).clone(2);
assertEquals(streamlets.size(), 2);
assertTrue(streamlets.get(0) instanceof MapStreamlet);
@@ -164,9 +166,9 @@ public void testCloneStreamlet() {
@Test
@SuppressWarnings("unchecked")
- public void testUnionStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet1 =
StreamletImpl.createSupplierStreamlet(() -> Math.random());
- Streamlet<Double> baseStreamlet2 =
StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ public void testUnionStreamlet() {
+ Streamlet<Double> baseStreamlet1 = builder.newSource(() -> Math.random());
+ Streamlet<Double> baseStreamlet2 = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet1.union(baseStreamlet2);
assertTrue(streamlet instanceof UnionStreamlet);
SupplierStreamlet<Double> supplierStreamlet1 = (SupplierStreamlet<Double>)
baseStreamlet1;
@@ -179,8 +181,8 @@ public void testUnionStreamlet() throws Exception {
@Test
@SuppressWarnings("unchecked")
- public void testTransformStreamlet() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testTransformStreamlet() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet =
baseStreamlet.transform(new SerializableTransformer<Double, Double>() {
@Override
@@ -209,8 +211,8 @@ public void cleanup() {
@Test
@SuppressWarnings("unchecked")
- public void testCustomStreamletFromBolt() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testCustomStreamletFromBolt() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new
MyBoltOperator());
assertTrue(streamlet instanceof CustomStreamlet);
@@ -227,8 +229,8 @@ public void testCustomStreamletFromBolt() throws Exception {
@Test
@SuppressWarnings("unchecked")
- public void testCustomStreamletFromBasicBolt() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testCustomStreamletFromBasicBolt() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new
MyBasicBoltOperator());
assertTrue(streamlet instanceof CustomStreamlet);
@@ -246,8 +248,8 @@ public void testCustomStreamletFromBasicBolt() throws
Exception {
@Test
@SuppressWarnings("unchecked")
- public void testCustomStreamletFromWindowBolt() throws Exception {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ public void testCustomStreamletFromWindowBolt() {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new
MyWindowBoltOperator());
assertTrue(streamlet instanceof CustomStreamlet);
@@ -261,16 +263,16 @@ public void testCustomStreamletFromWindowBolt() throws
Exception {
@Test
@SuppressWarnings("unchecked")
- public void testSimpleBuild() throws Exception {
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> "sa re ga ma");
+ public void testSimpleBuild() {
+ Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
baseStreamlet.flatMap(x -> Arrays.asList(x.split(" ")))
.reduceByKeyAndWindow(x -> x, x -> 1,
WindowConfig.TumblingCountWindow(10),
(x, y) -> x + y);
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>)
baseStreamlet;
assertFalse(supplierStreamlet.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet.build(builder, stageNames);
+ supplierStreamlet.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet.allBuilt());
assertEquals(supplierStreamlet.getChildren().size(), 1);
assertTrue(supplierStreamlet.getChildren().get(0) instanceof
FlatMapStreamlet);
@@ -286,14 +288,14 @@ public void testSimpleBuild() throws Exception {
@Test
@SuppressWarnings("unchecked")
- public void testComplexBuild() throws Exception {
+ public void testComplexBuild() {
// First source
- Streamlet<String> baseStreamlet1 =
StreamletImpl.createSupplierStreamlet(() -> "sa re ga ma");
+ Streamlet<String> baseStreamlet1 = builder.newSource(() -> "sa re ga ma");
Streamlet<String> leftStream =
baseStreamlet1.flatMap(x -> Arrays.asList(x.split(" ")));
// Second source
- Streamlet<String> baseStreamlet2 =
StreamletImpl.createSupplierStreamlet(() -> "I Love You");
+ Streamlet<String> baseStreamlet2 = builder.newSource(() -> "I Love You");
Streamlet<String> rightStream =
baseStreamlet2.flatMap(x -> Arrays.asList(x.split(" ")));
@@ -305,13 +307,13 @@ public void testComplexBuild() throws Exception {
SupplierStreamlet<String> supplierStreamlet2 = (SupplierStreamlet<String>)
baseStreamlet2;
assertFalse(supplierStreamlet1.isBuilt());
assertFalse(supplierStreamlet2.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet1.build(builder, stageNames);
+ supplierStreamlet1.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet1.isBuilt());
assertFalse(supplierStreamlet1.allBuilt());
- supplierStreamlet2.build(builder, stageNames);
+ supplierStreamlet2.build(topologyBuilder, stageNames);
assertTrue(supplierStreamlet1.allBuilt());
assertTrue(supplierStreamlet2.allBuilt());
@@ -341,7 +343,7 @@ public void testComplexBuild() throws Exception {
@SuppressWarnings("unchecked")
public void testCalculatedDefaultStageNames() {
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>)
baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 0);
@@ -351,9 +353,9 @@ public void testCalculatedDefaultStageNames() {
// build SupplierStreamlet
assertFalse(supplierStreamlet.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet.build(builder, stageNames);
+ supplierStreamlet.build(topologyBuilder, stageNames);
// verify SupplierStreamlet
assertTrue(supplierStreamlet.allBuilt());
@@ -396,7 +398,7 @@ public void testConfigBuilder() {
@Test
public void testDefaultStreamletNameIfNotSet() {
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>)
baseStreamlet;
Set<String> stageNames = new HashSet<>();
@@ -414,7 +416,7 @@ public void testDefaultStreamletNameIfNotSet() {
public void testStreamletNameIfAlreadySet() {
String supplierName = "MyStringSupplier";
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>)
baseStreamlet;
supplierStreamlet.setName(supplierName);
@@ -432,7 +434,7 @@ public void testStreamletNameIfAlreadySet() {
@Test(expected = RuntimeException.class)
public void testStreamletNameIfDuplicateNameIsSet() {
// create SupplierStreamlet
- Streamlet<String> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
->
+ Streamlet<String> baseStreamlet = builder.newSource(() ->
"This is test content");
SupplierStreamlet<String> supplierStreamlet = (SupplierStreamlet<String>)
baseStreamlet;
@@ -444,14 +446,14 @@ public void testStreamletNameIfDuplicateNameIsSet() {
// build SupplierStreamlet
assertFalse(supplierStreamlet.isBuilt());
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
Set<String> stageNames = new HashSet<>();
- supplierStreamlet.build(builder, stageNames);
+ supplierStreamlet.build(topologyBuilder, stageNames);
}
@Test
public void testSetNameWithInvalidValues() {
- Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() ->
Math.random());
+ Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
Function<String, Streamlet<Double>> function = streamlet::setName;
testByFunction(function, null);
testByFunction(function, "");
@@ -460,14 +462,14 @@ public void testSetNameWithInvalidValues() {
@Test(expected = IllegalArgumentException.class)
public void testSetNumPartitionsWithInvalidValue() {
- Streamlet<Double> streamlet = StreamletImpl.createSupplierStreamlet(() ->
Math.random());
+ Streamlet<Double> streamlet = builder.newSource(() -> Math.random());
streamlet.setNumPartitions(0);
}
@Test(expected = IllegalArgumentException.class)
@SuppressWarnings("unchecked")
public void testCloneStreamletWithInvalidNumberOfClone() {
- Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(()
-> Math.random());
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
baseStreamlet.setNumPartitions(20).clone(0);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services