Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242661816 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void testBasicTopology(boolean useLocalMessaging) throws Exception { + try (LocalCluster cluster = new LocalCluster.Builder() + .withSimulatedTime() + .withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) + .build()) { + Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); + Map<String, BoltDetails> boltMap = new HashMap<>(); + boltMap.put("2", + Thrift.prepareBoltDetails( + Collections.singletonMap( + Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), + new TestWordCounter(), 4)); + boltMap.put("3", + Thrift.prepareBoltDetails( + Collections.singletonMap( + Utils.getGlobalStreamId("1", null), + Thrift.prepareGlobalGrouping()), + new TestGlobalCount())); + boltMap.put("4", + Thrift.prepareBoltDetails( + Collections.singletonMap( + Utils.getGlobalStreamId("2", null), + Thrift.prepareGlobalGrouping()), + new TestAggregatesCounter())); + StormTopology topology = Thrift.buildTopology(spoutMap, boltMap); --- End diff -- Could we get a follow on JIRA to move the code that uses Thrift to create a topology over to using TopologyBuilder? I think these APIs were used because they are a little cleaner in clojure, but in java they take up a lot more code, and it is harder to follow.
---