nwangtw closed pull request #3099: Nwang/add streamlet operator interfaces with
grouping v2
URL: https://github.com/apache/incubator-heron/pull/3099
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
index 9cddddf268..8c569e3041 100644
--- a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
+++ b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
@@ -147,6 +147,10 @@ public BoltDeclarer customGrouping(
return grouping(componentName, streamId, grouper);
}
+ public BoltDeclarer grouping(String componentName, StreamGrouping grouper) {
+ return grouping(componentName, Utils.DEFAULT_STREAM_ID, grouper);
+ }
+
public BoltDeclarer grouping(
String componentName,
String streamId,
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index b5fe93162f..821fa5143a 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -21,6 +21,7 @@
import java.util.List;
+import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.classification.InterfaceStability;
/**
@@ -209,6 +210,15 @@
*/
<T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator,
StreamGrouping grouper);
+
/**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation
returns void
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 7abf9e2cf8..e76cd264cf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -24,6 +24,8 @@
import java.util.Set;
import java.util.logging.Logger;
+import org.apache.heron.api.grouping.NoneStreamGrouping;
+import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletOperator;
import org.apache.heron.streamlet.JoinType;
@@ -317,8 +319,7 @@ private String defaultNameCalculator(StreamletNamePrefix
prefix, Set<String> sta
*/
@Override
public List<Streamlet<R>> clone(int numClones) {
- require(numClones > 0,
- "Streamlet's clone number should be > 0");
+ require(numClones > 0, "Streamlet's clone number should be > 0");
List<Streamlet<R>> retval = new ArrayList<>(numClones);
for (int i = 0; i < numClones; ++i) {
retval.add(repartition(getNumPartitions()));
@@ -522,9 +523,26 @@ public void toSink(Sink<R> sink) {
public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
checkNotNull(operator, "operator cannot be null");
- StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
+ // By default, NoneStreamGrouping stategy is used. In this stategy, tuples
are forwarded
+ // from parent component to a ramdon one of all the instances of the child
component,
+ // which is the same logic as shuffle grouping.
+ return applyOperator(operator, new NoneStreamGrouping());
+ }
+
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ @Override
+ public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator,
StreamGrouping grouper) {
+ checkNotNull(operator, "operator can't be null");
+ checkNotNull(grouper, "grouper can't be null");
+
+ StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator,
grouper);
addChild(customStreamlet);
return customStreamlet;
}
-
}
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index 9426001986..98bb8466ee 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -42,6 +42,12 @@ public ConsumerStreamlet(StreamletImpl<R> parent,
SerializableConsumer<R> consum
setNumPartitions(parent.getNumPartitions());
}
+ /**
+ * Connect this streamlet to TopologyBuilder.
+ * @param bldr The TopologyBuilder for the topology
+ * @param stageNames The existing stage names
+ * @return True if successful
+ */
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.CONSUMER, stageNames);
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index dbeea14a2e..d09b1344bb 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -21,6 +21,8 @@
import java.util.Set;
+import org.apache.heron.api.grouping.StreamGrouping;
+import org.apache.heron.api.topology.BoltDeclarer;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.IStreamletBasicOperator;
import org.apache.heron.streamlet.IStreamletOperator;
@@ -35,37 +37,52 @@
public class CustomStreamlet<R, T> extends StreamletImpl<T> {
private StreamletImpl<R> parent;
private IStreamletOperator<R, T> operator;
+ private StreamGrouping grouper;
/**
* Create a custom streamlet from user defined CustomOperator object.
* @param parent The parent(upstream) streamlet object
- * @param operator The user defined CustomOperator
+ * @param operator The user defined CustomeOperator
+ * @param grouper The StreamGrouper to be used with the operator
*/
public CustomStreamlet(StreamletImpl<R> parent,
- IStreamletOperator<R, T> operator) {
+ IStreamletOperator<R, T> operator,
+ StreamGrouping grouper) {
this.parent = parent;
this.operator = operator;
+ this.grouper = grouper;
setNumPartitions(parent.getNumPartitions());
}
+ /**
+ * Connect this streamlet to TopologyBuilder.
+ * @param bldr The TopologyBuilder for the topology
+ * @param stageNames The existing stage names
+ * @return True if successful
+ */
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+ // Create and set bolt
+ BoltDeclarer declarer;
if (operator instanceof IStreamletBasicOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>)
operator;
- bldr.setBolt(getName(), op,
getNumPartitions()).shuffleGrouping(parent.getName());
+ declarer = bldr.setBolt(getName(), op, getNumPartitions());
} else if (operator instanceof IStreamletRichOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>)
operator;
- bldr.setBolt(getName(), op,
getNumPartitions()).shuffleGrouping(parent.getName());
+ declarer = bldr.setBolt(getName(), op, getNumPartitions());
} else if (operator instanceof IStreamletWindowOperator) {
setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>)
operator;
- bldr.setBolt(getName(), op,
getNumPartitions()).shuffleGrouping(parent.getName());
+ declarer = bldr.setBolt(getName(), op, getNumPartitions());
} else {
throw new RuntimeException("Unhandled operator class is found!");
}
+ // Apply grouping
+ declarer.grouping(parent.getName(), grouper);
+
return true;
}
}
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index 707751f5da..f29215bbf9 100644
---
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
@@ -39,11 +38,18 @@ public LogStreamlet(StreamletImpl<R> parent) {
setNumPartitions(parent.getNumPartitions());
}
+ /**
+ * Connect this streamlet to TopologyBuilder.
+ * @param bldr The TopologyBuilder for the topology
+ * @param stageNames The existing stage names
+ * @return True if successful
+ */
@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefix.LOGGER, stageNames);
bldr.setBolt(getName(), new LogSink<R>(),
getNumPartitions()).shuffleGrouping(parent.getName());
+
return true;
}
}
diff --git
a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 5a11c75855..8200316f7b 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -18,6 +18,7 @@
*/
package org.apache.heron.streamlet.scala
+import org.apache.heron.api.grouping.StreamGrouping
import org.apache.heron.streamlet.{
IStreamletOperator,
JoinType,
@@ -231,6 +232,15 @@ trait Streamlet[R] {
*/
def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ def applyOperator[T](operator: IStreamletOperator[R, T], grouper:
StreamGrouping): Streamlet[T]
+
/**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation
returns void
diff --git
a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 44b6ede305..50c655efb2 100644
---
a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++
b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -20,6 +20,7 @@ package org.apache.heron.streamlet.scala.impl
import scala.collection.JavaConverters
+import org.apache.heron.api.grouping.StreamGrouping
import org.apache.heron.streamlet.{
IStreamletOperator,
JoinType,
@@ -330,6 +331,19 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
fromJavaStreamlet(newJavaStreamlet)
}
+ /**
+ * Returns a new Streamlet by applying the operator on each element of this
streamlet.
+ * @param operator The operator to be applied
+ * @param grouper The grouper to be applied with the operator
+ * @param <T> The return type of the transform
+ * @return Streamlet containing the output of the operation
+ */
+ override def applyOperator[T](operator: IStreamletOperator[R, T],
+ grouper: StreamGrouping): Streamlet[T] = {
+ val newJavaStreamlet = javaStreamlet.applyOperator[T](operator, grouper)
+ fromJavaStreamlet(newJavaStreamlet)
+ }
+
/**
* Logs every element of the streamlet using String.valueOf function
* This is one of the sink functions in the sense that this operation
returns void
diff --git
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 2237bcebc4..7a76dfc9ce 100644
---
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -27,6 +27,7 @@
import org.junit.Test;
+import org.apache.heron.api.grouping.ShuffleStreamGrouping;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.resource.TestBasicBolt;
@@ -223,6 +224,21 @@ public void testCustomStreamletFromBolt() {
assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCustomStreamletWithGrouperFromBolt() throws Exception {
+ Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+ Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+ .applyOperator(new
MyBoltOperator(),
+ new
ShuffleStreamGrouping());
+ assertTrue(streamlet instanceof CustomStreamlet);
+ CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double,
Double>) streamlet;
+ assertEquals(20, mStreamlet.getNumPartitions());
+ SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>)
baseStreamlet;
+ assertEquals(supplierStreamlet.getChildren().size(), 1);
+ assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ }
+
private class MyBasicBoltOperator extends TestBasicBolt
implements IStreamletBasicOperator<Double, Double> {
}
@@ -263,7 +279,7 @@ public void testCustomStreamletFromWindowBolt() {
@Test
@SuppressWarnings("unchecked")
- public void testSimpleBuild() {
+ public void testSimpleBuild() throws Exception {
Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
baseStreamlet.flatMap(x -> Arrays.asList(x.split(" ")))
.reduceByKeyAndWindow(x -> x, x -> 1,
WindowConfig.TumblingCountWindow(10),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services