Repository: storm Updated Branches: refs/heads/master 8eb4509d9 -> 4c9506773
Updated grammar and punctuation on storm-kafka/README.md. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/473323e5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/473323e5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/473323e5 Branch: refs/heads/master Commit: 473323e5a94a36c9f14fe84053f66ea64e6f8d8e Parents: 895ba42 Author: Stephen Hopper <[email protected]> Authored: Mon Sep 7 13:06:57 2015 -0500 Committer: Stephen Hopper <[email protected]> Committed: Mon Sep 7 13:06:57 2015 -0500 ---------------------------------------------------------------------- external/storm-kafka/README.md | 46 ++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/473323e5/external/storm-kafka/README.md ---------------------------------------------------------------------- diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index 3fe46e9..91c64bf 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -1,41 +1,41 @@ Storm Kafka ==================== -Provides core storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x. +Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x. ##Spouts -We support both trident and core storm spouts. For both spout implementation we use a BrokerHost interface that -tracks kafka broker host to partition mapping and kafkaConfig that controls some kafka related parameters. +We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that +tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters. ###BrokerHosts -In order to initialize your kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. -Currently we support following two implementations: +In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. +Currently, we support the following two implementations: ####ZkHosts -ZkHosts is what you should use if you want to dynamically track kafka broker to partition mapping. This class uses -Kafka's zookeeper's entries to track brokerHost -> partition mapping. You can instantiate an object by calling +ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses +Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling ```java public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr) ``` -Where brokerZkStr is just ip:port e.g. localhost:2181. brokerZkPath is the root directory under which all the topics and -partition information is stored. by Default this is /brokers which is what default kafka implementation uses. +Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and +partition information is stored. By default this is /brokers which is what the default Kafka implementation uses. -By default the broker-partition mapping is refreshed every 60 seconds from zookeeper. If you want to change it you +By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you should set host.refreshFreqSecs to your chosen value. ####StaticHosts This is an alternative implementation where broker -> partition information is static. In order to construct an instance -of this class you need to first construct an instance of GlobalPartitionInformation. +of this class, you need to first construct an instance of GlobalPartitionInformation. ```java Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); - partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 - partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 - partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2 + partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0 + partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1 + partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo); ``` @@ -46,8 +46,8 @@ The second thing needed for constructing a kafkaSpout is an instance of KafkaCon public KafkaConfig(BrokerHosts hosts, String topic, String clientId) ``` -The BrokerHosts can be any implementation of BrokerHosts interface as described above. the Topic is name of kafka topic. -The optional ClientId is used as a part of the zookeeper path where the spout's current consumption offset is stored. +The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic. +The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored. There are 2 extensions of KafkaConfig currently in use. @@ -60,7 +60,7 @@ public SpoutConfig(BrokerHosts hosts, String topic, String id); ``` In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves: ```java - // setting for how often to save the current kafka offset to ZooKeeper + // setting for how often to save the current Kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; // Exponential back-off retry settings. These are used when retrying messages after a bolt @@ -95,7 +95,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl Most of them are self explanatory except MultiScheme. ###MultiScheme -MultiScheme is an interface that dictates how the byte[] consumed from kafka gets transformed into a storm tuple. It +MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed into a storm tuple. It also controls the naming of your output field. ```java @@ -198,8 +198,8 @@ These interfaces have 2 methods defined: V getMessageFromTuple(Tuple/TridentTuple tuple); ``` -as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field -as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.java +As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field +as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. @@ -213,13 +213,13 @@ public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); } ``` -The implementation of this interface should return topic to which the tuple's key/message mapping needs to be published +The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor. -### Specifying kafka producer properties +### Specifying Kafka producer properties You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs -section "Important configuration properties for the producer", in your storm topology config by setting the properties +section "Important configuration properties for the producer", in your Storm topology config by setting the properties map with key kafka.broker.properties. ###Putting it all together
