[
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14708995#comment-14708995
]
ASF GitHub Bot commented on FLINK-2525:
---------------------------------------
GitHub user ffbin opened a pull request:
https://github.com/apache/flink/pull/1046
[FLINK-2525]Add configuration support in Storm-compatibility
- enable config can used in Spouts.open() and Bout.prepare().
Example like this:
public static void main(final String[] args) {
String topologyId = "Streaming WordCount";
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
...
final Config conf = new Config();
conf.put("wordsFile", "/home/user/");
conf.put("delimitSize", 1024);
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, builder.createTopology());
Utils.sleep(10 * 1000);
cluster.killTopology(topologyId);
cluster.shutdown();
}
public class WordReader implements IRichSpout {
....
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile"));
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file
["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
}
public final class StormBoltTokenizer implements IRichBolt {
....
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.delimitSize = stormConf.get("delimitSize");
this.collector = collector;
}
}
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ffbin/flink FLINK-2525
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1046.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1046
----
commit c6aebc10b7a010cc9cd5fb5b6505fdbc942ab7b9
Author: ffbin <[email protected]>
Date: 2015-08-24T09:07:26Z
[FLINK-2525]Add configuration support in Storm-compatibility
----
> Add configuration support in Storm-compatibility
> ------------------------------------------------
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
> Issue Type: New Feature
> Components: flink-contrib
> Reporter: fangfengbin
> Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot
> be configure with user defined parameters. In order to support this feature,
> spout and bolt wrapper classes need to be extended to create a proper `Map`
> object. Furthermore, the clients need to be extended to take a `Map`,
> translate it into a Flink `Configuration` that is forwarded to the wrappers
> for proper initialization of the map.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)