Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2927#discussion_r242663110
  
    --- Diff: 
storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java 
---
    @@ -0,0 +1,927 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.integration;
    +
    +import static org.apache.storm.integration.AssertLoop.assertAcked;
    +import static org.apache.storm.integration.AssertLoop.assertFailed;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertThat;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.Testing;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.Thrift.BoltDetails;
    +import org.apache.storm.Thrift.SpoutDetails;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.hooks.BaseTaskHook;
    +import org.apache.storm.hooks.info.BoltAckInfo;
    +import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.hooks.info.BoltFailInfo;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.testing.AckFailMapTracker;
    +import org.apache.storm.testing.CompleteTopologyParam;
    +import org.apache.storm.testing.FeederSpout;
    +import org.apache.storm.testing.FixedTuple;
    +import org.apache.storm.testing.IntegrationTest;
    +import org.apache.storm.testing.MockedSources;
    +import org.apache.storm.testing.TestAggregatesCounter;
    +import org.apache.storm.testing.TestConfBolt;
    +import org.apache.storm.testing.TestGlobalCount;
    +import org.apache.storm.testing.TestPlannerSpout;
    +import org.apache.storm.testing.TestWordCounter;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.testing.TrackedTopology;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.Utils;
    +import org.junit.jupiter.api.Test;
    +import org.junit.jupiter.params.ParameterizedTest;
    +import org.junit.jupiter.params.provider.ValueSource;
    +
    +@IntegrationTest
    +public class TopologyIntegrationTest {
    +
    +    @ParameterizedTest
    +    @ValueSource(strings = {"true", "false"})
    +    public void testBasicTopology(boolean useLocalMessaging) throws 
Exception {
    +        try (LocalCluster cluster = new LocalCluster.Builder()
    +            .withSimulatedTime()
    +            .withSupervisors(4)
    +            
.withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, 
!useLocalMessaging))
    +            .build()) {
    +            Map<String, SpoutDetails> spoutMap = 
Collections.singletonMap("1", Thrift.prepareSpoutDetails(new 
TestWordSpout(true), 3));
    +            Map<String, BoltDetails> boltMap = new HashMap<>();
    +            boltMap.put("2",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
    +                    new TestWordCounter(), 4));
    +            boltMap.put("3",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("1", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestGlobalCount()));
    +            boltMap.put("4",
    +                Thrift.prepareBoltDetails(
    +                    Collections.singletonMap(
    +                        Utils.getGlobalStreamId("2", null),
    +                        Thrift.prepareGlobalGrouping()),
    +                    new TestAggregatesCounter()));
    +            StormTopology topology = Thrift.buildTopology(spoutMap, 
boltMap);
    +
    +            Map<String, Object> stormConf = new HashMap<>();
    +            stormConf.put(Config.TOPOLOGY_WORKERS, 2);
    +            stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, 
true);
    +
    +            List<Values> testValues = new ArrayList<>();
    +            testValues.add(new Values("nathan"));
    +            testValues.add(new Values("bob"));
    +            testValues.add(new Values("joey"));
    +            testValues.add(new Values("nathan"));
    +            List<FixedTuple> testTuples = testValues.stream()
    +                .map(value -> new FixedTuple(value))
    +                .collect(Collectors.toList());
    --- End diff --
    
    Wouldn't it be simpler to do something like.
    
    ```
    List<FixedTuples> testTuples = Arrays.asList("nathan", "bob", "joey", 
"nathan").stream()
      .map(name -> new FixedTuple(new Values(name)))
      .collect(Collectors.toList());
    ```


---

Reply via email to