[ 
https://issues.apache.org/jira/browse/STORM-1015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901496#comment-14901496
 ] 

ASF GitHub Bot commented on STORM-1015:
---------------------------------------

Github user erikdw commented on a diff in the pull request:

    https://github.com/apache/storm/pull/705#discussion_r40032442
  
    --- Diff: 
external/storm-kafka/src/jvm/storm/kafka/PartitionStateManagerFactory.java ---
    @@ -0,0 +1,68 @@
    +package storm.kafka;
    +
    +import backtype.storm.Config;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class PartitionStateManagerFactory {
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionStateManagerFactory.class);
    +
    +    private ZkDataStore sharedZkDataStore;
    +
    +    private Map _stormConf;
    +    private SpoutConfig _spoutConfig;
    +
    +
    +    private ZkDataStore createZkDataStore(Map conf, SpoutConfig 
spoutConfig) {
    +        Map _zkDataStoreConf = new HashMap(conf);
    +        List<String> zkServers = _spoutConfig.zkServers;
    +        if (zkServers == null) {
    +            zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        }
    +        Integer zkPort = _spoutConfig.zkPort;
    +        if (zkPort == null) {
    +            zkPort = ((Number) 
conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    +        }
    +        _zkDataStoreConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, 
zkServers);
    +        _zkDataStoreConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
    +        _zkDataStoreConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, 
_spoutConfig.zkRoot);
    +        return new ZkDataStore(_zkDataStoreConf);
    +
    +    }
    +
    +    public PartitionStateManagerFactory(Map stormConf, SpoutConfig 
spoutConfig) {
    +        this._stormConf = stormConf;
    +        this._spoutConfig = spoutConfig;
    +
    +        // default to orignal storm storage format
    +        if (_spoutConfig.stateStore == null || 
"storm".equals(_spoutConfig.stateStore)) {
    --- End diff --
    
    I feel like "storm" storage is confusing.  Maybe "zookeeper" instead?


> Store Kafka offsets with Kafka's consumer offset management api
> ---------------------------------------------------------------
>
>                 Key: STORM-1015
>                 URL: https://issues.apache.org/jira/browse/STORM-1015
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka
>    Affects Versions: 0.11.0
>            Reporter: Hang Sun
>            Priority: Minor
>              Labels: consumer, kafka, offset
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Current Kafka spout stores the offsets (and some other states) inside ZK with 
> its proprietary format. This does not work well with other Kafka offset 
> monitoring tools such as Burrow, KafkaOffsetMonitor etc. In addition, the 
> performance does not scale well compared with offsets managed by Kafka's 
> built-in offset management api. I have added a new option for Kafka to store 
> the same data using Kafka's built-in offset management capability. The change 
> is completely backward compatible with the current ZK storage option. The 
> feature can be turned on by a single configuration option. Hope this will 
> help people who wants to explore the option of using Kafka's built-in offset 
> management api.
> References:
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> -thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to