[
https://issues.apache.org/jira/browse/STORM-1015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901495#comment-14901495
]
ASF GitHub Bot commented on STORM-1015:
---------------------------------------
Github user erikdw commented on a diff in the pull request:
https://github.com/apache/storm/pull/705#discussion_r40032400
--- Diff:
external/storm-kafka/src/jvm/storm/kafka/PartitionStateManagerFactory.java ---
@@ -0,0 +1,68 @@
+package storm.kafka;
+
+import backtype.storm.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionStateManagerFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionStateManagerFactory.class);
+
+ private ZkDataStore sharedZkDataStore;
+
+ private Map _stormConf;
+ private SpoutConfig _spoutConfig;
+
+
+ private ZkDataStore createZkDataStore(Map conf, SpoutConfig
spoutConfig) {
+ Map _zkDataStoreConf = new HashMap(conf);
+ List<String> zkServers = _spoutConfig.zkServers;
+ if (zkServers == null) {
+ zkServers = (List<String>)
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+ }
+ Integer zkPort = _spoutConfig.zkPort;
+ if (zkPort == null) {
+ zkPort = ((Number)
conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+ }
+ _zkDataStoreConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS,
zkServers);
+ _zkDataStoreConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
+ _zkDataStoreConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT,
_spoutConfig.zkRoot);
+ return new ZkDataStore(_zkDataStoreConf);
+
+ }
+
+ public PartitionStateManagerFactory(Map stormConf, SpoutConfig
spoutConfig) {
+ this._stormConf = stormConf;
+ this._spoutConfig = spoutConfig;
+
+ // default to orignal storm storage format
--- End diff --
typo: original
> Store Kafka offsets with Kafka's consumer offset management api
> ---------------------------------------------------------------
>
> Key: STORM-1015
> URL: https://issues.apache.org/jira/browse/STORM-1015
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka
> Affects Versions: 0.11.0
> Reporter: Hang Sun
> Priority: Minor
> Labels: consumer, kafka, offset
> Original Estimate: 72h
> Remaining Estimate: 72h
>
> Current Kafka spout stores the offsets (and some other states) inside ZK with
> its proprietary format. This does not work well with other Kafka offset
> monitoring tools such as Burrow, KafkaOffsetMonitor etc. In addition, the
> performance does not scale well compared with offsets managed by Kafka's
> built-in offset management api. I have added a new option for Kafka to store
> the same data using Kafka's built-in offset management capability. The change
> is completely backward compatible with the current ZK storage option. The
> feature can be turned on by a single configuration option. Hope this will
> help people who wants to explore the option of using Kafka's built-in offset
> management api.
> References:
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> -thanks
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)