[ https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709109#comment-14709109 ]
ASF GitHub Bot commented on FLINK-2525: --------------------------------------- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1046#discussion_r37741464 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java --- @@ -52,6 +75,32 @@ public void testRunExecuteCancelInfinite() throws Exception { } @Test + public void testOpen() throws Exception { + final IRichSpout spout = mock(IRichSpout.class); + final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout); + + Configuration jobConfiguration = new Configuration(); + jobConfiguration.setString(new String("path"), new String("/home/user/file.txt")); + jobConfiguration.setInteger(new String("delimitSize"), 1024); + Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(), + new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class), + mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class), + mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class), + new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class), + mock(TaskManagerRuntimeInfo.class)); + StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(env, new ExecutionConfig(), + mock(KeySelector.class), + mock(StateHandleProvider.class), mock(Map.class)); + + spoutWrapper.setRuntimeContext(runtimeContext); + spoutWrapper.open(mock(Configuration.class)); + final SourceFunction.SourceContext ctx = mock(SourceFunction.SourceContext.class); + spoutWrapper.cancel(); + spoutWrapper.run(ctx); + verify(spout).open(any(Map.class), any(TopologyContext.class), any(SpoutOutputCollector.class)); + } + --- End diff -- Some comment as in `StormBoltWrapperTest` > Add configuration support in Storm-compatibility > ------------------------------------------------ > > Key: FLINK-2525 > URL: https://issues.apache.org/jira/browse/FLINK-2525 > Project: Flink > Issue Type: New Feature > Components: flink-contrib > Reporter: fangfengbin > Assignee: fangfengbin > > Spouts and Bolt are initialized by a call to `Spout.open(...)` and > `Bolt.prepare()`, respectively. Both methods have a config `Map` as first > parameter. This map is currently not populated. Thus, Spouts and Bolts cannot > be configure with user defined parameters. In order to support this feature, > spout and bolt wrapper classes need to be extended to create a proper `Map` > object. Furthermore, the clients need to be extended to take a `Map`, > translate it into a Flink `Configuration` that is forwarded to the wrappers > for proper initialization of the map. -- This message was sent by Atlassian JIRA (v6.3.4#6332)