Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2927#discussion_r242663110
--- Diff:
storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java
---
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.integration;
+
+import static org.apache.storm.integration.AssertLoop.assertAcked;
+import static org.apache.storm.integration.AssertLoop.assertFailed;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.Thrift;
+import org.apache.storm.Thrift.BoltDetails;
+import org.apache.storm.Thrift.SpoutDetails;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.hooks.BaseTaskHook;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.AckFailMapTracker;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestConfBolt;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestPlannerSpout;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@IntegrationTest
+public class TopologyIntegrationTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"true", "false"})
+ public void testBasicTopology(boolean useLocalMessaging) throws
Exception {
+ try (LocalCluster cluster = new LocalCluster.Builder()
+ .withSimulatedTime()
+ .withSupervisors(4)
+
.withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ,
!useLocalMessaging))
+ .build()) {
+ Map<String, SpoutDetails> spoutMap =
Collections.singletonMap("1", Thrift.prepareSpoutDetails(new
TestWordSpout(true), 3));
+ Map<String, BoltDetails> boltMap = new HashMap<>();
+ boltMap.put("2",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
+ new TestWordCounter(), 4));
+ boltMap.put("3",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("1", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestGlobalCount()));
+ boltMap.put("4",
+ Thrift.prepareBoltDetails(
+ Collections.singletonMap(
+ Utils.getGlobalStreamId("2", null),
+ Thrift.prepareGlobalGrouping()),
+ new TestAggregatesCounter()));
+ StormTopology topology = Thrift.buildTopology(spoutMap,
boltMap);
+
+ Map<String, Object> stormConf = new HashMap<>();
+ stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+ stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE,
true);
+
+ List<Values> testValues = new ArrayList<>();
+ testValues.add(new Values("nathan"));
+ testValues.add(new Values("bob"));
+ testValues.add(new Values("joey"));
+ testValues.add(new Values("nathan"));
+ List<FixedTuple> testTuples = testValues.stream()
+ .map(value -> new FixedTuple(value))
+ .collect(Collectors.toList());
--- End diff --
Wouldn't it be simpler to do something like.
```
List<FixedTuples> testTuples = Arrays.asList("nathan", "bob", "joey",
"nathan").stream()
.map(name -> new FixedTuple(new Values(name)))
.collect(Collectors.toList());
```
---