This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 38060b0 Nwang/add streamlet operator interfaces with grouping v2
(#3099)
38060b0 is described below
commit 38060b0166c57d3d5a1bbcbba50e9e8f708599f6
Author: Ning Wang <[email protected]>
AuthorDate: Wed Nov 21 16:40:02 2018 -0800
Nwang/add streamlet operator interfaces with grouping v2 (#3099)
* Add grouping support for Streamlet.applyOperator()
* Add support in Scala
* Simplify grouping support in Streamlet to allow only one incoming stream
---
.../apache/heron/api/topology/BoltDeclarer.java | 4 ++++
.../java/org/apache/heron/streamlet/Streamlet.java | 10 ++++++++
.../apache/heron/streamlet/impl/StreamletImpl.java | 26 +++++++++++++++++----
.../impl/streamlets/ConsumerStreamlet.java | 6 +++++
.../streamlet/impl/streamlets/CustomStreamlet.java | 27 ++++++++++++++++++----
.../streamlet/impl/streamlets/LogStreamlet.java | 8 ++++++-
.../apache/heron/streamlet/scala/Streamlet.scala | 10 ++++++++
.../heron/streamlet/scala/impl/StreamletImpl.scala | 14 +++++++++++
.../heron/streamlet/impl/StreamletImplTest.java | 18 ++++++++++++++-
9 files changed, 112 insertions(+), 11 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
index 9cddddf..8c569e3 100644
--- a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
+++ b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
@@ -147,6 +147,10 @@ public class BoltDeclarer extends
BaseComponentDeclarer<BoltDeclarer> {
return grouping(componentName, streamId, grouper);
}
+ public BoltDeclarer grouping(String componentName, StreamGrouping grouper) {
+ return grouping(componentName, Utils.DEFAULT_STREAM_ID, grouper);
+ }
+
public BoltDeclarer grouping(
String componentName,
String streamId,
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index b5fe931..821fa51 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -21,6 +21,7 @@ package org.apache.heron.streamlet;
import java.util.List;
+import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.classification.InterfaceStability;
/**
@@ -210,6 +211,15 @@ public interface Streamlet<R> {
<T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
/**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator,
StreamGrouping grouper);
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation
returns void
*/
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 7abf9e2..e76cd26 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Set;
import java.util.logging.Logger;
+import org.apache.heron.api.grouping.NoneStreamGrouping;
+import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
@@ -317,8 +319,7 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
*/
@Override
public List<Streamlet<R>> clone(int numClones) {
- require(numClones > 0,
- "Streamlet's clone number should be > 0");
+ require(numClones > 0, "Streamlet's clone number should be > 0");
List<Streamlet<R>> retval = new ArrayList<>(numClones);
for (int i = 0; i < numClones; ++i) {
retval.add(repartition(getNumPartitions()));
@@ -522,9 +523,26 @@ public abstract class StreamletImpl<R> implements
Streamlet<R> {
public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
checkNotNull(operator, "operator cannot be null");
- StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
+ // By default, NoneStreamGrouping stategy is used. In this stategy, tuples
are forwarded
+ // from parent component to a ramdon one of all the instances of the child
component,
+ // which is the same logic as shuffle grouping.
+ return applyOperator(operator, new NoneStreamGrouping());
+ }
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ @Override
+ public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator,
StreamGrouping grouper) {
+ checkNotNull(operator, "operator can't be null");
+ checkNotNull(grouper, "grouper can't be null");
+
+ StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator,
grouper);
addChild(customStreamlet);
return customStreamlet;
}
-
}
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index 9426001..98bb846 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -42,6 +42,12 @@ public class ConsumerStreamlet<R> extends StreamletImpl<R> {
setNumPartitions(parent.getNumPartitions());
}
+ /**
+ * Connect this streamlet to TopologyBuilder.
+ * @param bldr The TopologyBuilder for the topology
+ * @param stageNames The existing stage names
+ * @return True if successful
+ */
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.CONSUMER, stageNames);
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index dbeea14..d09b134 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -21,6 +21,8 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
+import org.apache.heron.api.grouping.StreamGrouping;
+import org.apache.heron.api.topology.BoltDeclarer;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletBasicOperator;
import org.apache.heron.streamlet.IStreamletOperator;
@@ -35,37 +37,52 @@ import org.apache.heron.streamlet.impl.StreamletImpl;
public class CustomStreamlet<R, T> extends StreamletImpl<T> {
private StreamletImpl<R> parent;
private IStreamletOperator<R, T> operator;
+ private StreamGrouping grouper;
/**
* Create a custom streamlet from user defined CustomOperator object.
* @param parent The parent(upstream) streamlet object
- * @param operator The user defined CustomOperator
+ * @param operator The user defined CustomeOperator
+ * @param grouper The StreamGrouper to be used with the operator
*/
public CustomStreamlet(StreamletImpl<R> parent,
- IStreamletOperator<R, T> operator) {
+ IStreamletOperator<R, T> operator,
+ StreamGrouping grouper) {
this.parent = parent;
this.operator = operator;
+ this.grouper = grouper;
setNumPartitions(parent.getNumPartitions());
}
+ /**
+ * Connect this streamlet to TopologyBuilder.
+ * @param bldr The TopologyBuilder for the topology
+ * @param stageNames The existing stage names
+ * @return True if successful
+ */
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ // Create and set bolt
+ BoltDeclarer declarer;
if (operator instanceof IStreamletBasicOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>)
operator;
- bldr.setBolt(getName(), op,
getNumPartitions()).shuffleGrouping(parent.getName());
+ declarer = bldr.setBolt(getName(), op, getNumPartitions());
} else if (operator instanceof IStreamletRichOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>)
operator;
- bldr.setBolt(getName(), op,
getNumPartitions()).shuffleGrouping(parent.getName());
+ declarer = bldr.setBolt(getName(), op, getNumPartitions());
} else if (operator instanceof IStreamletWindowOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>)
operator;
- bldr.setBolt(getName(), op,
getNumPartitions()).shuffleGrouping(parent.getName());
+ declarer = bldr.setBolt(getName(), op, getNumPartitions());
} else {
throw new RuntimeException("Unhandled operator class is found!");
}
+ // Apply grouping
+ declarer.grouping(parent.getName(), grouper);
+
return true;
}
}
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index 707751f..f29215b 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
@@ -39,11 +38,18 @@ public class LogStreamlet<R> extends StreamletImpl<R> {
setNumPartitions(parent.getNumPartitions());
}
+ /**
+ * Connect this streamlet to TopologyBuilder.
+ * @param bldr The TopologyBuilder for the topology
+ * @param stageNames The existing stage names
+ * @return True if successful
+ */
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.LOGGER, stageNames);
bldr.setBolt(getName(), new LogSink<R>(),
getNumPartitions()).shuffleGrouping(parent.getName());
+
return true;
}
}
diff --git
a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 5a11c75..8200316 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -18,6 +18,7 @@
*/
package org.apache.heron.streamlet.scala
+import org.apache.heron.api.grouping.StreamGrouping
import org.apache.heron.streamlet.{
IStreamletOperator,
JoinType,
@@ -232,6 +233,15 @@ trait Streamlet[R] {
def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
/**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ def applyOperator[T](operator: IStreamletOperator[R, T], grouper:
StreamGrouping): Streamlet[T]
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation
returns void
*/
diff --git
a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 44b6ede..50c655e 100644
---
a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++
b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -20,6 +20,7 @@ package org.apache.heron.streamlet.scala.impl
import scala.collection.JavaConverters
+import org.apache.heron.api.grouping.StreamGrouping
import org.apache.heron.streamlet.{
IStreamletOperator,
JoinType,
@@ -331,6 +332,19 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
}
/**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ override def applyOperator[T](operator: IStreamletOperator[R, T],
+ grouper: StreamGrouping): Streamlet[T] = {
+ val newJavaStreamlet = javaStreamlet.applyOperator[T](operator, grouper)
+ fromJavaStreamlet(newJavaStreamlet)
+ }
+
+ /**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation
returns void
*/
diff --git
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 2237bce..7a76dfc 100644
---
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
import org.junit.Test;
+import org.apache.heron.api.grouping.ShuffleStreamGrouping;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.resource.TestBasicBolt;
@@ -223,6 +224,21 @@ public class StreamletImplTest {
assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCustomStreamletWithGrouperFromBolt() throws Exception {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+ .applyOperator(new
MyBoltOperator(),
+ new
ShuffleStreamGrouping());
+ assertTrue(streamlet instanceof CustomStreamlet);
+ CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double,
Double>) streamlet;
+ assertEquals(20, mStreamlet.getNumPartitions());
+ SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>)
baseStreamlet;
+ assertEquals(supplierStreamlet.getChildren().size(), 1);
+ assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ }
+
private class MyBasicBoltOperator extends TestBasicBolt
implements IStreamletBasicOperator<Double, Double> {
}
@@ -263,7 +279,7 @@ public class StreamletImplTest {
@Test
@SuppressWarnings("unchecked")
- public void testSimpleBuild() {
+ public void testSimpleBuild() throws Exception {
Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
baseStreamlet.flatMap(x -> Arrays.asList(x.split(" ")))
.reduceByKeyAndWindow(x -> x, x -> 1,
WindowConfig.TumblingCountWindow(10),