[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709109#comment-14709109
 ] 

ASF GitHub Bot commented on FLINK-2525:
---------------------------------------

Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37741464
  
    --- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
 ---
    @@ -52,6 +75,32 @@ public void testRunExecuteCancelInfinite() throws 
Exception {
        }
     
        @Test
    +   public void testOpen() throws Exception {
    +           final IRichSpout spout = mock(IRichSpout.class);
    +           final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new 
StormSpoutWrapper<Tuple1<Integer>>(spout);
    +
    +           Configuration jobConfiguration = new Configuration();
    +           jobConfiguration.setString(new String("path"), new 
String("/home/user/file.txt"));
    +           jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +           Environment env = new RuntimeEnvironment(new JobID(), new 
JobVertexID(), new ExecutionAttemptID(),
    +                           new String(), new String(), 1, 2, 
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +                           mock(MemoryManager.class), 
mock(IOManager.class), mock(BroadcastVariableManager.class),
    +                           mock(AccumulatorRegistry.class), 
mock(InputSplitProvider.class), mock(Map.class),
    +                           new ResultPartitionWriter[1], new InputGate[1], 
mock(ActorGateway.class),
    +                           mock(TaskManagerRuntimeInfo.class));
    +           StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext(env, new ExecutionConfig(),
    +                           mock(KeySelector.class),
    +                           mock(StateHandleProvider.class), 
mock(Map.class));
    +
    +           spoutWrapper.setRuntimeContext(runtimeContext);
    +           spoutWrapper.open(mock(Configuration.class));
    +           final SourceFunction.SourceContext ctx = 
mock(SourceFunction.SourceContext.class);
    +           spoutWrapper.cancel();
    +           spoutWrapper.run(ctx);
    +           verify(spout).open(any(Map.class), any(TopologyContext.class), 
any(SpoutOutputCollector.class));
    +   }
    +
    --- End diff --
    
    Some comment as in `StormBoltWrapperTest`


> Add configuration support in Storm-compatibility
> ------------------------------------------------
>
>                 Key: FLINK-2525
>                 URL: https://issues.apache.org/jira/browse/FLINK-2525
>             Project: Flink
>          Issue Type: New Feature
>          Components: flink-contrib
>            Reporter: fangfengbin
>            Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to