This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 38060b0  Nwang/add streamlet operator interfaces with grouping v2 
(#3099)
38060b0 is described below

commit 38060b0166c57d3d5a1bbcbba50e9e8f708599f6
Author: Ning Wang <[email protected]>
AuthorDate: Wed Nov 21 16:40:02 2018 -0800

    Nwang/add streamlet operator interfaces with grouping v2 (#3099)
    
    * Add grouping support for Streamlet.applyOperator()
    
    * Add support in Scala
    
    * Simplify grouping support in Streamlet to allow only one incoming stream
---
 .../apache/heron/api/topology/BoltDeclarer.java    |  4 ++++
 .../java/org/apache/heron/streamlet/Streamlet.java | 10 ++++++++
 .../apache/heron/streamlet/impl/StreamletImpl.java | 26 +++++++++++++++++----
 .../impl/streamlets/ConsumerStreamlet.java         |  6 +++++
 .../streamlet/impl/streamlets/CustomStreamlet.java | 27 ++++++++++++++++++----
 .../streamlet/impl/streamlets/LogStreamlet.java    |  8 ++++++-
 .../apache/heron/streamlet/scala/Streamlet.scala   | 10 ++++++++
 .../heron/streamlet/scala/impl/StreamletImpl.scala | 14 +++++++++++
 .../heron/streamlet/impl/StreamletImplTest.java    | 18 ++++++++++++++-
 9 files changed, 112 insertions(+), 11 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java 
b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
index 9cddddf..8c569e3 100644
--- a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
+++ b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
@@ -147,6 +147,10 @@ public class BoltDeclarer extends 
BaseComponentDeclarer<BoltDeclarer> {
     return grouping(componentName, streamId, grouper);
   }
 
+  public BoltDeclarer grouping(String componentName, StreamGrouping grouper) {
+    return grouping(componentName, Utils.DEFAULT_STREAM_ID, grouper);
+  }
+
   public BoltDeclarer grouping(
       String componentName,
       String streamId,
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java 
b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
index b5fe931..821fa51 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java
@@ -21,6 +21,7 @@ package org.apache.heron.streamlet;
 
 import java.util.List;
 
+import org.apache.heron.api.grouping.StreamGrouping;
 import org.apache.heron.classification.InterfaceStability;
 
 /**
@@ -210,6 +211,15 @@ public interface Streamlet<R> {
   <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator);
 
   /**
+   * Returns a new Streamlet by applying the operator on each element of this 
streamlet.
+   * @param operator The operator to be applied
+   * @param grouper The grouper to be applied with the operator
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator, 
StreamGrouping grouper);
+
+  /**
    * Logs every element of the streamlet using String.valueOf function
    * This is one of the sink functions in the sense that this operation 
returns void
    */
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 7abf9e2..e76cd26 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.logging.Logger;
 
+import org.apache.heron.api.grouping.NoneStreamGrouping;
+import org.apache.heron.api.grouping.StreamGrouping;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
@@ -317,8 +319,7 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
    */
   @Override
   public List<Streamlet<R>> clone(int numClones) {
-    require(numClones > 0,
-        "Streamlet's clone number should be > 0");
+    require(numClones > 0, "Streamlet's clone number should be > 0");
     List<Streamlet<R>> retval = new ArrayList<>(numClones);
     for (int i = 0; i < numClones; ++i) {
       retval.add(repartition(getNumPartitions()));
@@ -522,9 +523,26 @@ public abstract class StreamletImpl<R> implements 
Streamlet<R> {
   public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
     checkNotNull(operator, "operator cannot be null");
 
-    StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator);
+    // By default, NoneStreamGrouping stategy is used. In this stategy, tuples 
are forwarded
+    // from parent component to a ramdon one of all the instances of the child 
component,
+    // which is the same logic as shuffle grouping.
+    return applyOperator(operator, new NoneStreamGrouping());
+  }
+
+  /**
+   * Returns a new Streamlet by applying the operator on each element of this 
streamlet.
+   * @param operator The operator to be applied
+   * @param grouper The grouper to be applied with the operator
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  @Override
+  public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator, 
StreamGrouping grouper) {
+    checkNotNull(operator, "operator can't be null");
+    checkNotNull(grouper, "grouper can't be null");
+
+    StreamletImpl<T> customStreamlet = new CustomStreamlet<>(this, operator, 
grouper);
     addChild(customStreamlet);
     return customStreamlet;
   }
-
 }
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
index 9426001..98bb846 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ConsumerStreamlet.java
@@ -42,6 +42,12 @@ public class ConsumerStreamlet<R> extends StreamletImpl<R> {
     setNumPartitions(parent.getNumPartitions());
   }
 
+  /**
+   * Connect this streamlet to TopologyBuilder.
+   * @param bldr The TopologyBuilder for the topology
+   * @param stageNames The existing stage names
+   * @return True if successful
+   */
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.CONSUMER, stageNames);
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
index dbeea14..d09b134 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java
@@ -21,6 +21,8 @@ package org.apache.heron.streamlet.impl.streamlets;
 
 import java.util.Set;
 
+import org.apache.heron.api.grouping.StreamGrouping;
+import org.apache.heron.api.topology.BoltDeclarer;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.IStreamletBasicOperator;
 import org.apache.heron.streamlet.IStreamletOperator;
@@ -35,37 +37,52 @@ import org.apache.heron.streamlet.impl.StreamletImpl;
 public class CustomStreamlet<R, T> extends StreamletImpl<T> {
   private StreamletImpl<R> parent;
   private IStreamletOperator<R, T> operator;
+  private StreamGrouping grouper;
 
   /**
    * Create a custom streamlet from user defined CustomOperator object.
    * @param parent The parent(upstream) streamlet object
-   * @param operator The user defined CustomOperator
+   * @param operator The user defined CustomeOperator
+   * @param grouper The StreamGrouper to be used with the operator
    */
   public CustomStreamlet(StreamletImpl<R> parent,
-                         IStreamletOperator<R, T> operator) {
+                         IStreamletOperator<R, T> operator,
+                         StreamGrouping grouper) {
     this.parent = parent;
     this.operator = operator;
+    this.grouper = grouper;
     setNumPartitions(parent.getNumPartitions());
   }
 
+  /**
+   * Connect this streamlet to TopologyBuilder.
+   * @param bldr The TopologyBuilder for the topology
+   * @param stageNames The existing stage names
+   * @return True if successful
+   */
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    // Create and set bolt
+    BoltDeclarer declarer;
     if (operator instanceof IStreamletBasicOperator) {
       setDefaultNameIfNone(StreamletNamePrefix.CUSTOM, stageNames);
       IStreamletBasicOperator<R, T> op = (IStreamletBasicOperator<R, T>) 
operator;
-      bldr.setBolt(getName(), op,  
getNumPartitions()).shuffleGrouping(parent.getName());
+      declarer = bldr.setBolt(getName(), op,  getNumPartitions());
     } else if (operator instanceof IStreamletRichOperator) {
       setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_BASIC, stageNames);
       IStreamletRichOperator<R, T> op = (IStreamletRichOperator<R, T>) 
operator;
-      bldr.setBolt(getName(), op,  
getNumPartitions()).shuffleGrouping(parent.getName());
+      declarer = bldr.setBolt(getName(), op,  getNumPartitions());
     } else if (operator instanceof IStreamletWindowOperator) {
       setDefaultNameIfNone(StreamletNamePrefix.CUSTOM_WINDOW, stageNames);
       IStreamletWindowOperator<R, T> op = (IStreamletWindowOperator<R, T>) 
operator;
-      bldr.setBolt(getName(), op,  
getNumPartitions()).shuffleGrouping(parent.getName());
+      declarer = bldr.setBolt(getName(), op,  getNumPartitions());
     } else {
       throw new RuntimeException("Unhandled operator class is found!");
     }
 
+    // Apply grouping
+    declarer.grouping(parent.getName(), grouper);
+
     return true;
   }
 }
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
index 707751f..f29215b 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.streamlets;
 
 import java.util.Set;
@@ -39,11 +38,18 @@ public class LogStreamlet<R> extends StreamletImpl<R> {
     setNumPartitions(parent.getNumPartitions());
   }
 
+  /**
+   * Connect this streamlet to TopologyBuilder.
+   * @param bldr The TopologyBuilder for the topology
+   * @param stageNames The existing stage names
+   * @return True if successful
+   */
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.LOGGER, stageNames);
     bldr.setBolt(getName(), new LogSink<R>(),
         getNumPartitions()).shuffleGrouping(parent.getName());
+
     return true;
   }
 }
diff --git 
a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala 
b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
index 5a11c75..8200316 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Streamlet.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.heron.streamlet.scala
 
+import org.apache.heron.api.grouping.StreamGrouping
 import org.apache.heron.streamlet.{
   IStreamletOperator,
   JoinType,
@@ -232,6 +233,15 @@ trait Streamlet[R] {
   def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T]
 
   /**
+   * Returns a new Streamlet by applying the operator on each element of this 
streamlet.
+   * @param operator The operator to be applied
+   * @param grouper The grouper to be applied with the operator
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  def applyOperator[T](operator: IStreamletOperator[R, T], grouper: 
StreamGrouping): Streamlet[T]
+
+  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation 
returns void
     */
diff --git 
a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala 
b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index 44b6ede..50c655e 100644
--- 
a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ 
b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -20,6 +20,7 @@ package org.apache.heron.streamlet.scala.impl
 
 import scala.collection.JavaConverters
 
+import org.apache.heron.api.grouping.StreamGrouping
 import org.apache.heron.streamlet.{
   IStreamletOperator,
   JoinType,
@@ -331,6 +332,19 @@ class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R])
   }
 
   /**
+   * Returns a new Streamlet by applying the operator on each element of this 
streamlet.
+   * @param operator The operator to be applied
+   * @param grouper The grouper to be applied with the operator
+   * @param <T> The return type of the transform
+   * @return Streamlet containing the output of the operation
+   */
+  override def applyOperator[T](operator: IStreamletOperator[R, T],
+                                grouper: StreamGrouping): Streamlet[T] = {
+    val newJavaStreamlet = javaStreamlet.applyOperator[T](operator, grouper)
+    fromJavaStreamlet(newJavaStreamlet)
+  }
+
+  /**
     * Logs every element of the streamlet using String.valueOf function
     * This is one of the sink functions in the sense that this operation 
returns void
     */
diff --git 
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java 
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 2237bce..7a76dfc 100644
--- 
a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ 
b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
 
 import org.junit.Test;
 
+import org.apache.heron.api.grouping.ShuffleStreamGrouping;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.resource.TestBasicBolt;
@@ -223,6 +224,21 @@ public class StreamletImplTest {
     assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCustomStreamletWithGrouperFromBolt() throws Exception {
+    Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
+    Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
+                                               .applyOperator(new 
MyBoltOperator(),
+                                                              new 
ShuffleStreamGrouping());
+    assertTrue(streamlet instanceof CustomStreamlet);
+    CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, 
Double>) streamlet;
+    assertEquals(20, mStreamlet.getNumPartitions());
+    SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) 
baseStreamlet;
+    assertEquals(supplierStreamlet.getChildren().size(), 1);
+    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+  }
+
   private class MyBasicBoltOperator extends TestBasicBolt
       implements IStreamletBasicOperator<Double, Double> {
   }
@@ -263,7 +279,7 @@ public class StreamletImplTest {
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testSimpleBuild() {
+  public void testSimpleBuild() throws Exception {
     Streamlet<String> baseStreamlet = builder.newSource(() -> "sa re ga ma");
     baseStreamlet.flatMap(x -> Arrays.asList(x.split(" ")))
                  .reduceByKeyAndWindow(x -> x, x -> 1, 
WindowConfig.TumblingCountWindow(10),

Reply via email to