This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new e3be7e4 fix the issue that Heron ECO stream builder does not handle
IStatefulWindowedBolt (#3235)
e3be7e4 is described below
commit e3be7e41bef1900ec2064b575b76d9eaa8f25cfd
Author: SiMing Weng <[email protected]>
AuthorDate: Fri Apr 19 12:21:10 2019 -0400
fix the issue that Heron ECO stream builder does not handle
IStatefulWindowedBolt (#3235)
make Heron ECO stream builder handle IStatefulWindowedBolt
add unit test
---
.../heron/eco/builder/heron/StreamBuilder.java | 15 ++++-
.../eco/builder/heron/HeronStreamBuilderTest.java | 65 ++++++++++++++++++++++
.../src/org/apache/bazel/cppcheck/CppCheck.java | 5 +-
3 files changed, 81 insertions(+), 4 deletions(-)
diff --git a/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
b/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
index d2d4530..90fdb58 100644
--- a/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
+++ b/eco/src/java/org/apache/heron/eco/builder/heron/StreamBuilder.java
@@ -19,6 +19,7 @@
package org.apache.heron.eco.builder.heron;
+import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;
@@ -26,6 +27,7 @@ import java.util.Map;
import org.apache.heron.api.bolt.IBasicBolt;
import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.bolt.IStatefulWindowedBolt;
import org.apache.heron.api.bolt.IWindowedBolt;
import org.apache.heron.api.grouping.CustomStreamGrouping;
import org.apache.heron.api.topology.BoltDeclarer;
@@ -42,8 +44,8 @@ import org.apache.heron.eco.definition.StreamDefinition;
public class StreamBuilder {
- protected void buildStreams(EcoExecutionContext executionContext,
TopologyBuilder builder,
- ObjectBuilder objectBuilder)
+ protected <K extends Serializable, V extends Serializable> void buildStreams(
+ EcoExecutionContext executionContext, TopologyBuilder builder,
ObjectBuilder objectBuilder)
throws IllegalAccessException, InstantiationException,
ClassNotFoundException,
NoSuchFieldException, InvocationTargetException {
EcoTopologyDefinition topologyDefinition =
executionContext.getTopologyDefinition();
@@ -68,6 +70,15 @@ public class StreamBuilder {
topologyDefinition.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
+ } else if (boltObj instanceof IStatefulWindowedBolt) {
+ if (declarer == null) {
+ //noinspection unchecked
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IStatefulWindowedBolt<K, V>) boltObj,
+ topologyDefinition.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
} else if (boltObj instanceof IWindowedBolt) {
if (declarer == null) {
declarer = builder.setBolt(
diff --git
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
index 47f1b6a..72ea2a8 100644
---
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
+++
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
@@ -32,12 +32,14 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import org.apache.heron.api.bolt.BaseStatefulWindowedBolt;
import org.apache.heron.api.bolt.BasicOutputCollector;
import org.apache.heron.api.bolt.IBasicBolt;
import org.apache.heron.api.bolt.IRichBolt;
import org.apache.heron.api.bolt.IWindowedBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.grouping.CustomStreamGrouping;
+import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.BoltDeclarer;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyBuilder;
@@ -262,6 +264,56 @@ public class HeronStreamBuilderTest {
verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition),
same(mockContext));
}
+ @Test
+ public void buildStreams_SpoutToIStatefulWindowedBolt() throws
ClassNotFoundException,
+ InvocationTargetException,
+ NoSuchFieldException,
+ InstantiationException,
+ IllegalAccessException {
+ final int iRichBoltParallelism = 1;
+ final String to = "to";
+ final String from = "from";
+ final String streamId = "id";
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setFrom(from);
+ streamDefinition.setTo(to);
+ streamDefinition.setId(streamId);
+ List<StreamDefinition> streams = new ArrayList<>();
+ streams.add(streamDefinition);
+ GroupingDefinition groupingDefinition = new GroupingDefinition();
+ groupingDefinition.setType(GroupingDefinition.Type.CUSTOM);
+ MockCustomObjectDefinition mockCustomObjectDefinition = new
MockCustomObjectDefinition();
+
+ groupingDefinition.setCustomClass(mockCustomObjectDefinition);
+ List<String> args = new ArrayList<>();
+ args.add("arg1");
+ groupingDefinition.setArgs(args);
+ groupingDefinition.setStreamId(streamId);
+ streamDefinition.setGrouping(groupingDefinition);
+ MockIStatefulWindowedBolt mockIStatefulWindowedBolt = new
MockIStatefulWindowedBolt();
+ MockCustomStreamGrouping mockCustomStreamGrouping = new
MockCustomStreamGrouping();
+
+ when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+ when(mockContext.getBolt(eq(to))).thenReturn(mockIStatefulWindowedBolt);
+ when(mockDefinition.getStreams()).thenReturn(streams);
+
when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+ when(mockTopologyBuilder.setBolt(eq(to),
+ eq(mockIStatefulWindowedBolt),
eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+ when(mockObjectBuilder.buildObject(eq(mockCustomObjectDefinition),
+ eq(mockContext))).thenReturn(mockCustomStreamGrouping);
+
+ subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockContext).getBolt(eq(to));
+ verify(mockDefinition).parallelismForBolt(eq(to));
+ verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIStatefulWindowedBolt),
eq(iRichBoltParallelism));
+ verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId),
eq(mockCustomStreamGrouping));
+ verify(mockContext).setStreams(anyMap());
+ verify(mockDefinition).getStreams();
+ verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition),
same(mockContext));
+ }
+
private class MockCustomObjectDefinition extends ObjectDefinition {
}
@@ -344,6 +396,19 @@ public class HeronStreamBuilderTest {
}
}
+ private class MockIStatefulWindowedBolt extends
BaseStatefulWindowedBolt<String, byte[]> {
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+
+ }
+
+ @Override
+ public void initState(State<String, byte[]> state) {
+
+ }
+ }
+
@SuppressWarnings({"rawtypes", "unchecked", "serial"})
public class MockIBasicBolt implements IBasicBolt {
diff --git a/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
b/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
index c0d57d9..6cfa9de 100644
--- a/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
+++ b/tools/java/src/org/apache/bazel/cppcheck/CppCheck.java
@@ -86,7 +86,8 @@ public final class CppCheck {
commandBuilder.add("--std=c++11");
commandBuilder.add("--language=c++");
commandBuilder.add("--error-exitcode=1"); // exit with 1 on error
- commandBuilder.add("--library=googletest"); // use googletest cfg so
that TEST_F is not considered syntax error
+ // use googletest cfg so that TEST_F is not considered syntax error
+ commandBuilder.add("--library=googletest");
commandBuilder.addAll(sourceFiles);
runChecker(commandBuilder);
@@ -113,7 +114,7 @@ public final class CppCheck {
if (cppcheck.exitValue() == 1) {
LOG.warning("cppcheck detected bad cpp files.");
- System.exit(1);
+ throw new RuntimeException("cppcheck detected bad cpp files.");
}
if (cppcheck.exitValue() != 0) {