[FLINK-5522] [storm compatibility] Move Storm LocalCluster based test to a separate class
This fixes the problem that the Storm LocalCluster can't run with powermock This closes #3138 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d05fc377 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d05fc377 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d05fc377 Branch: refs/heads/master Commit: d05fc377ee688b231fb1b0daeb8a34fd054f3ca1 Parents: 5313459 Author: liuyuzhong7 <[email protected]> Authored: Thu Feb 9 16:16:15 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Sat Feb 18 19:19:34 2017 +0100 ---------------------------------------------------------------------- .../storm/wrappers/WrapperSetupHelperTest.java | 167 +--------------- .../WrapperSetupInLocalClusterTest.java | 190 +++++++++++++++++++ 2 files changed, 191 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d05fc377/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java index 5e29ac4..5f38705 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java @@ -17,29 +17,15 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.generated.ComponentCommon; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.task.TopologyContext; +import org.apache.flink.storm.util.AbstractTest; import org.apache.storm.topology.IComponent; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; - -import org.apache.flink.storm.api.FlinkTopology; -import org.apache.flink.storm.util.AbstractTest; -import org.apache.flink.storm.util.TestDummyBolt; -import org.apache.flink.storm.util.TestDummySpout; -import org.apache.flink.storm.util.TestSink; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; - import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; - import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -47,14 +33,9 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; import static java.util.Collections.singleton; - import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest(WrapperSetupHelper.class) @@ -150,150 +131,4 @@ public class WrapperSetupHelperTest extends AbstractTest { boltOrSpout, numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null)); } - - @Test - public void testCreateTopologyContext() { - HashMap<String, Integer> dops = new HashMap<String, Integer>(); - dops.put("spout1", 1); - dops.put("spout2", 3); - dops.put("bolt1", 1); - dops.put("bolt2", 2); - dops.put("sink", 1); - - HashMap<String, Integer> taskCounter = new HashMap<String, Integer>(); - taskCounter.put("spout1", 0); - taskCounter.put("spout2", 0); - taskCounter.put("bolt1", 0); - taskCounter.put("bolt2", 0); - taskCounter.put("sink", 0); - - HashMap<String, IComponent> operators = new HashMap<String, IComponent>(); - operators.put("spout1", new TestDummySpout()); - operators.put("spout2", new TestDummySpout()); - operators.put("bolt1", new TestDummyBolt()); - operators.put("bolt2", new TestDummyBolt()); - operators.put("sink", new TestSink()); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1")); - builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2")); - builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1"); - builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); - builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) - .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); - -// LocalCluster cluster = new LocalCluster(); -// Config c = new Config(); -// c.setNumAckers(0); -// cluster.submitTopology("test", c, builder.createTopology()); -// -// while (TestSink.result.size() != 8) { -// Utils.sleep(100); -// } -// cluster.shutdown(); - - final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder); - StormTopology stormTopology = flinkBuilder.getStormTopology(); - - Set<Integer> taskIds = new HashSet<Integer>(); - - for (TopologyContext expectedContext : TestSink.result) { - final String thisComponentId = expectedContext.getThisComponentId(); - int index = taskCounter.get(thisComponentId); - - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - when(context.getTaskName()).thenReturn(thisComponentId); - when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId)); - when(context.getIndexOfThisSubtask()).thenReturn(index); - taskCounter.put(thisComponentId, ++index); - - Config stormConfig = new Config(); - stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test"); - - TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context, - operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig); - - ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId); - ComponentCommon common = topologyContext.getComponentCommon(thisComponentId); - - Assert.assertNull(topologyContext.getCodeDir()); - Assert.assertNull(common.get_json_conf()); - Assert.assertNull(topologyContext.getExecutorData(null)); - Assert.assertNull(topologyContext.getPIDDir()); - Assert.assertNull(topologyContext.getResource(null)); - Assert.assertNull(topologyContext.getSharedExecutor()); - Assert.assertNull(expectedContext.getTaskData(null)); - Assert.assertNull(topologyContext.getThisWorkerPort()); - - Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId())); - - Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs()); - Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint()); - Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams()); - Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds()); - Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId), - topologyContext.getComponentStreams(thisComponentId)); - Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId()); - Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources()); - Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams()); - Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets()); - Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size()); - - for (int taskId : topologyContext.getComponentTasks(thisComponentId)) { - Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId)); - } - - for (String componentId : expectedContext.getComponentIds()) { - Assert.assertEquals(expectedContext.getSources(componentId), - topologyContext.getSources(componentId)); - Assert.assertEquals(expectedContext.getTargets(componentId), - topologyContext.getTargets(componentId)); - - for (String streamId : expectedContext.getComponentStreams(componentId)) { - Assert.assertEquals( - expectedContext.getComponentOutputFields(componentId, streamId).toList(), - topologyContext.getComponentOutputFields(componentId, streamId).toList()); - } - } - - for (String streamId : expectedContext.getThisStreams()) { - Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(), - topologyContext.getThisOutputFields(streamId).toList()); - } - - HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>(); - Set<Integer> allTaskIds = new HashSet<Integer>(); - for (String componentId : expectedContext.getComponentIds()) { - List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId); - List<Integer> tasks = topologyContext.getComponentTasks(componentId); - - Iterator<Integer> p_it = possibleTasks.iterator(); - Iterator<Integer> t_it = tasks.iterator(); - while(p_it.hasNext()) { - Assert.assertTrue(t_it.hasNext()); - Assert.assertNull(taskToComponents.put(p_it.next(), componentId)); - Assert.assertTrue(allTaskIds.add(t_it.next())); - } - Assert.assertFalse(t_it.hasNext()); - } - - Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent()); - Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId())); - - try { - topologyContext.getHooks(); - Assert.fail(); - } catch (UnsupportedOperationException e) { /* expected */ } - - try { - topologyContext.getRegisteredMetricByName(null); - Assert.fail(); - } catch (UnsupportedOperationException e) { /* expected */ } - } - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/d05fc377/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java new file mode 100644 index 0000000..00173df --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import org.apache.flink.storm.api.FlinkTopology; +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IComponent; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class WrapperSetupInLocalClusterTest extends AbstractTest { + + @Test + public void testCreateTopologyContext() { + HashMap<String, Integer> dops = new HashMap<String, Integer>(); + dops.put("spout1", 1); + dops.put("spout2", 3); + dops.put("bolt1", 1); + dops.put("bolt2", 2); + dops.put("sink", 1); + + HashMap<String, Integer> taskCounter = new HashMap<String, Integer>(); + taskCounter.put("spout1", 0); + taskCounter.put("spout2", 0); + taskCounter.put("bolt1", 0); + taskCounter.put("bolt2", 0); + taskCounter.put("sink", 0); + + HashMap<String, IComponent> operators = new HashMap<String, IComponent>(); + operators.put("spout1", new TestDummySpout()); + operators.put("spout2", new TestDummySpout()); + operators.put("bolt1", new TestDummyBolt()); + operators.put("bolt2", new TestDummyBolt()); + operators.put("sink", new TestSink()); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1")); + builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2")); + builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1"); + builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); + builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) + .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) + .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) + .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) + .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); + + LocalCluster cluster = new LocalCluster(); + Config c = new Config(); + c.setNumAckers(0); + cluster.submitTopology("test", c, builder.createTopology()); + + while (TestSink.result.size() != 8) { + Utils.sleep(100); + } + cluster.shutdown(); + final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder); + StormTopology stormTopology = flinkBuilder.getStormTopology(); + + Set<Integer> taskIds = new HashSet<Integer>(); + + for (TopologyContext expectedContext : TestSink.result) { + final String thisComponentId = expectedContext.getThisComponentId(); + int index = taskCounter.get(thisComponentId); + + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + when(context.getTaskName()).thenReturn(thisComponentId); + when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId)); + when(context.getIndexOfThisSubtask()).thenReturn(index); + taskCounter.put(thisComponentId, ++index); + + Config stormConfig = new Config(); + stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test"); + + TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context, + operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig); + + ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId); + ComponentCommon common = topologyContext.getComponentCommon(thisComponentId); + + Assert.assertNull(topologyContext.getCodeDir()); + Assert.assertNull(common.get_json_conf()); + Assert.assertNull(topologyContext.getExecutorData(null)); + Assert.assertNull(topologyContext.getPIDDir()); + Assert.assertNull(topologyContext.getResource(null)); + Assert.assertNull(topologyContext.getSharedExecutor()); + Assert.assertNull(expectedContext.getTaskData(null)); + Assert.assertNull(topologyContext.getThisWorkerPort()); + + Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId())); + + Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs()); + Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint()); + Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams()); + Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds()); + Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId), + topologyContext.getComponentStreams(thisComponentId)); + Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId()); + Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources()); + Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams()); + Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets()); + Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size()); + + for (int taskId : topologyContext.getComponentTasks(thisComponentId)) { + Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId)); + } + + for (String componentId : expectedContext.getComponentIds()) { + Assert.assertEquals(expectedContext.getSources(componentId), + topologyContext.getSources(componentId)); + Assert.assertEquals(expectedContext.getTargets(componentId), + topologyContext.getTargets(componentId)); + + for (String streamId : expectedContext.getComponentStreams(componentId)) { + Assert.assertEquals( + expectedContext.getComponentOutputFields(componentId, streamId).toList(), + topologyContext.getComponentOutputFields(componentId, streamId).toList()); + } + } + + for (String streamId : expectedContext.getThisStreams()) { + Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(), + topologyContext.getThisOutputFields(streamId).toList()); + } + + HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>(); + Set<Integer> allTaskIds = new HashSet<Integer>(); + for (String componentId : expectedContext.getComponentIds()) { + List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId); + List<Integer> tasks = topologyContext.getComponentTasks(componentId); + + Iterator<Integer> p_it = possibleTasks.iterator(); + Iterator<Integer> t_it = tasks.iterator(); + while(p_it.hasNext()) { + Assert.assertTrue(t_it.hasNext()); + Assert.assertNull(taskToComponents.put(p_it.next(), componentId)); + Assert.assertTrue(allTaskIds.add(t_it.next())); + } + Assert.assertFalse(t_it.hasNext()); + } + + Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent()); + Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId())); + + try { + topologyContext.getHooks(); + Assert.fail(); + } catch (UnsupportedOperationException e) { /* expected */ } + + try { + topologyContext.getRegisteredMetricByName(null); + Assert.fail(); + } catch (UnsupportedOperationException e) { /* expected */ } + } + } + +}
