This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new e3be7e4  fix the issue that Heron ECO stream builder does not handle 
IStatefulWindowedBolt (#3235)
e3be7e4 is described below

commit e3be7e41bef1900ec2064b575b76d9eaa8f25cfd
Author: SiMing Weng <[email protected]>
AuthorDate: Fri Apr 19 12:21:10 2019 -0400

    fix the issue that Heron ECO stream builder does not handle 
IStatefulWindowedBolt (#3235)
    
    make Heron ECO stream builder handle IStatefulWindowedBolt
    add unit test
---
 .../heron/eco/builder/heron/StreamBuilder.java     | 15 ++++-
 .../eco/builder/heron/HeronStreamBuilderTest.java  | 65 ++++++++++++++++++++++
 .../src/org/apache/bazel/cppcheck/CppCheck.java    |  5 +-
 3 files changed, 81 insertions(+), 4 deletions(-)

diff --git a/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java 
b/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
index d2d4530..90fdb58 100644
--- a/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
+++ b/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
@@ -19,6 +19,7 @@
 
 package org.apache.heron.eco.builder.heron;
 
+import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.List;
@@ -26,6 +27,7 @@ import java.util.Map;
 
 import org.apache.heron.api.bolt.IBasicBolt;
 import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.bolt.IStatefulWindowedBolt;
 import org.apache.heron.api.bolt.IWindowedBolt;
 import org.apache.heron.api.grouping.CustomStreamGrouping;
 import org.apache.heron.api.topology.BoltDeclarer;
@@ -42,8 +44,8 @@ import org.apache.heron.eco.definition.StreamDefinition;
 
 public class StreamBuilder {
 
-  protected void buildStreams(EcoExecutionContext executionContext, 
TopologyBuilder builder,
-                              ObjectBuilder objectBuilder)
+  protected <K extends Serializable, V extends Serializable> void buildStreams(
+      EcoExecutionContext executionContext, TopologyBuilder builder, 
ObjectBuilder objectBuilder)
       throws IllegalAccessException, InstantiationException, 
ClassNotFoundException,
       NoSuchFieldException, InvocationTargetException {
     EcoTopologyDefinition topologyDefinition = 
executionContext.getTopologyDefinition();
@@ -68,6 +70,15 @@ public class StreamBuilder {
               topologyDefinition.parallelismForBolt(stream.getTo()));
           declarers.put(stream.getTo(), declarer);
         }
+      } else if (boltObj instanceof IStatefulWindowedBolt) {
+        if (declarer == null) {
+          //noinspection unchecked
+          declarer = builder.setBolt(
+              stream.getTo(),
+              (IStatefulWindowedBolt<K, V>) boltObj,
+              topologyDefinition.parallelismForBolt(stream.getTo()));
+          declarers.put(stream.getTo(), declarer);
+        }
       } else if (boltObj instanceof IWindowedBolt) {
         if (declarer == null) {
           declarer = builder.setBolt(
diff --git 
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java 
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
index 47f1b6a..72ea2a8 100644
--- 
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
+++ 
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
@@ -32,12 +32,14 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
+import org.apache.heron.api.bolt.BaseStatefulWindowedBolt;
 import org.apache.heron.api.bolt.BasicOutputCollector;
 import org.apache.heron.api.bolt.IBasicBolt;
 import org.apache.heron.api.bolt.IRichBolt;
 import org.apache.heron.api.bolt.IWindowedBolt;
 import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.grouping.CustomStreamGrouping;
+import org.apache.heron.api.state.State;
 import org.apache.heron.api.topology.BoltDeclarer;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.topology.TopologyBuilder;
@@ -262,6 +264,56 @@ public class HeronStreamBuilderTest {
     verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition), 
same(mockContext));
   }
 
+  @Test
+  public void buildStreams_SpoutToIStatefulWindowedBolt() throws 
ClassNotFoundException,
+      InvocationTargetException,
+      NoSuchFieldException,
+      InstantiationException,
+      IllegalAccessException {
+    final int iRichBoltParallelism = 1;
+    final String to = "to";
+    final String from = "from";
+    final String streamId  = "id";
+    StreamDefinition streamDefinition  = new StreamDefinition();
+    streamDefinition.setFrom(from);
+    streamDefinition.setTo(to);
+    streamDefinition.setId(streamId);
+    List<StreamDefinition> streams = new ArrayList<>();
+    streams.add(streamDefinition);
+    GroupingDefinition groupingDefinition = new GroupingDefinition();
+    groupingDefinition.setType(GroupingDefinition.Type.CUSTOM);
+    MockCustomObjectDefinition mockCustomObjectDefinition = new 
MockCustomObjectDefinition();
+
+    groupingDefinition.setCustomClass(mockCustomObjectDefinition);
+    List<String> args = new ArrayList<>();
+    args.add("arg1");
+    groupingDefinition.setArgs(args);
+    groupingDefinition.setStreamId(streamId);
+    streamDefinition.setGrouping(groupingDefinition);
+    MockIStatefulWindowedBolt mockIStatefulWindowedBolt = new 
MockIStatefulWindowedBolt();
+    MockCustomStreamGrouping mockCustomStreamGrouping = new 
MockCustomStreamGrouping();
+
+    when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+    when(mockContext.getBolt(eq(to))).thenReturn(mockIStatefulWindowedBolt);
+    when(mockDefinition.getStreams()).thenReturn(streams);
+    
when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+    when(mockTopologyBuilder.setBolt(eq(to),
+        eq(mockIStatefulWindowedBolt), 
eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+    when(mockObjectBuilder.buildObject(eq(mockCustomObjectDefinition),
+        eq(mockContext))).thenReturn(mockCustomStreamGrouping);
+
+    subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockContext).getBolt(eq(to));
+    verify(mockDefinition).parallelismForBolt(eq(to));
+    verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIStatefulWindowedBolt), 
eq(iRichBoltParallelism));
+    verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId), 
eq(mockCustomStreamGrouping));
+    verify(mockContext).setStreams(anyMap());
+    verify(mockDefinition).getStreams();
+    verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition), 
same(mockContext));
+  }
+
   private class MockCustomObjectDefinition extends ObjectDefinition {
 
   }
@@ -344,6 +396,19 @@ public class HeronStreamBuilderTest {
     }
   }
 
+  private class MockIStatefulWindowedBolt extends 
BaseStatefulWindowedBolt<String, byte[]> {
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+
+    }
+
+    @Override
+    public void initState(State<String, byte[]> state) {
+
+    }
+  }
+
 
   @SuppressWarnings({"rawtypes", "unchecked", "serial"})
   public class MockIBasicBolt implements IBasicBolt {
diff --git a/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java 
b/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
index c0d57d9..6cfa9de 100644
--- a/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
+++ b/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
@@ -86,7 +86,8 @@ public final class CppCheck {
       commandBuilder.add("--std=c++11");
       commandBuilder.add("--language=c++");
       commandBuilder.add("--error-exitcode=1"); // exit with 1 on error
-      commandBuilder.add("--library=googletest"); // use googletest cfg so 
that TEST_F is not considered syntax error
+      // use googletest cfg so that TEST_F is not considered syntax error
+      commandBuilder.add("--library=googletest");
       commandBuilder.addAll(sourceFiles);
       runChecker(commandBuilder);
 
@@ -113,7 +114,7 @@ public final class CppCheck {
 
     if (cppcheck.exitValue() == 1) {
       LOG.warning("cppcheck detected bad cpp files.");
-      System.exit(1);
+      throw new RuntimeException("cppcheck detected bad cpp files.");
     }
 
     if (cppcheck.exitValue() != 0) {

Reply via email to