[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14708995#comment-14708995
 ] 

ASF GitHub Bot commented on FLINK-2525:
---------------------------------------

GitHub user ffbin opened a pull request:

    https://github.com/apache/flink/pull/1046

    [FLINK-2525]Add configuration support in Storm-compatibility

    - enable config can used in Spouts.open() and Bout.prepare().
    
    Example like this:
    public static void main(final String[] args) {
        String topologyId = "Streaming WordCount";
        final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
        ...
        final Config conf = new Config();
        conf.put("wordsFile", "/home/user/");
        conf.put("delimitSize", 1024);
        final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        cluster.submitTopology(topologyId, conf, builder.createTopology());
        Utils.sleep(10 * 1000);
        cluster.killTopology(topologyId);
        cluster.shutdown();     
    }
    
    public class WordReader implements IRichSpout {
        ....
        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
                try {
                        this.context = context;
                        this.fileReader = new FileReader(conf.get("wordsFile"));
                } catch (FileNotFoundException e) {
                        throw new RuntimeException("Error reading file 
["+conf.get("wordFile")+"]");
                }
                this.collector = collector;
        }
    }
    
    public final class StormBoltTokenizer implements IRichBolt {
        ....
        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
                this.delimitSize = stormConf.get("delimitSize");
                this.collector = collector;
        }       
    }


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ffbin/flink FLINK-2525

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1046
    
----
commit c6aebc10b7a010cc9cd5fb5b6505fdbc942ab7b9
Author: ffbin <[email protected]>
Date:   2015-08-24T09:07:26Z

    [FLINK-2525]Add configuration support in Storm-compatibility

----


> Add configuration support in Storm-compatibility
> ------------------------------------------------
>
>                 Key: FLINK-2525
>                 URL: https://issues.apache.org/jira/browse/FLINK-2525
>             Project: Flink
>          Issue Type: New Feature
>          Components: flink-contrib
>            Reporter: fangfengbin
>            Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to