Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242698501 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void testBasicTopology(boolean useLocalMessaging) throws Exception { + try (LocalCluster cluster = new LocalCluster.Builder() + .withSimulatedTime() + .withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) + .build()) { + Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); + Map<String, BoltDetails> boltMap = new HashMap<>(); + boltMap.put("2", + Thrift.prepareBoltDetails( + Collections.singletonMap( + Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), + new TestWordCounter(), 4)); + boltMap.put("3", + Thrift.prepareBoltDetails( + Collections.singletonMap( + Utils.getGlobalStreamId("1", null), + Thrift.prepareGlobalGrouping()), + new TestGlobalCount())); + boltMap.put("4", + Thrift.prepareBoltDetails( + Collections.singletonMap( + Utils.getGlobalStreamId("2", null), + Thrift.prepareGlobalGrouping()), + new TestAggregatesCounter())); + StormTopology topology = Thrift.buildTopology(spoutMap, boltMap); + + Map<String, Object> stormConf = new HashMap<>(); + stormConf.put(Config.TOPOLOGY_WORKERS, 2); + stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true); + + List<Values> testValues = new ArrayList<>(); + testValues.add(new Values("nathan")); + testValues.add(new Values("bob")); + testValues.add(new Values("joey")); + testValues.add(new Values("nathan")); + List<FixedTuple> testTuples = testValues.stream() + .map(value -> new FixedTuple(value)) + .collect(Collectors.toList()); --- End diff -- Yes, absolutely. I'll change it.
---