[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709108#comment-14709108
]
ASF GitHub Bot commented on FLINK-2525:
---------------------------------------
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1046#discussion_r37741352
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
---
@@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
@SuppressWarnings("unchecked")
@Test
+ public void testOpenWithStormConf() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+ final StormBoltWrapper<Object, Object> wrapper = new
StormBoltWrapper<Object, Object>(bolt);
+
+ Configuration jobConfiguration = new Configuration();
+ jobConfiguration.setString(new String("path"), new
String("/home/user/file.txt"));
+ jobConfiguration.setInteger(new String("delimitSize"), 1024);
+ Environment env = new RuntimeEnvironment(new JobID(), new
JobVertexID(), new ExecutionAttemptID(),
+ new String(), new String(), 1, 2,
jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
+ mock(MemoryManager.class),
mock(IOManager.class), mock(BroadcastVariableManager.class),
+ mock(AccumulatorRegistry.class),
mock(InputSplitProvider.class), mock(Map.class),
+ new ResultPartitionWriter[1], new InputGate[1],
mock(ActorGateway.class),
+ mock(TaskManagerRuntimeInfo.class));
+ StreamingRuntimeContext ctx = new StreamingRuntimeContext(env,
new ExecutionConfig(),
+ mock(KeySelector.class),
+ mock(StateHandleProvider.class),
mock(Map.class));
+
+ wrapper.setup(mock(Output.class), ctx);
+ wrapper.open(mock(Configuration.class));
+
+ verify(bolt).prepare(any(Map.class),
any(TopologyContext.class), any(OutputCollector.class));
--- End diff --
You should not check for `any(Map.class)` but check if the map contains the
value you set above in `jobConfiguration`
> Add configuration support in Storm-compatibility
> ------------------------------------------------
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: flink-contrib
> Reporter: fangfengbin
> Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)