Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1808#discussion_r91349569
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -1,90 +1,222 @@
    -#Storm Kafka Spout with New Kafka Consumer API
    +#Storm Kafka integration using the kafka-client jar
     
    -Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
    +##Compatability
     
    -The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
    -In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
    +Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
     
    -The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
    +##Writing to Kafka as part of your topology
    +You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
    +are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
    +org.apache.storm.kafka.trident.TridentKafkaUpdater.
     
    -The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
    +You need to provide implementations for the following 2 interfaces
     
    -Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
    +###TupleToKafkaMapper and TridentTupleToKafkaMapper
    +These interfaces have 2 methods defined:
     
    +```java
    +    K getKeyFromTuple(Tuple/TridentTuple tuple);
    +    V getMessageFromTuple(Tuple/TridentTuple tuple);
    +```
    +
    +As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
    +as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
    +implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
    +use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
    +reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
    +In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
    +These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
    +
    +###KafkaTopicSelector and trident KafkaTopicSelector
    +This interface has only one method
    +```java
    +public interface KafkaTopicSelector {
    +    String getTopics(Tuple/TridentTuple tuple);
    +}
    +```
    +The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
    +You can return a null and the message will be ignored. If you have one 
static topic name then you can use
    +DefaultTopicSelector.java and set the name of the topic in the constructor.
    +`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
    +User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
    +When the topic name not found , `KafkaBolt` will write messages into 
default topic .
    +Please make sure the default topic have created .
    +
    +### Specifying Kafka producer properties
    +You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
    +Section "Important configuration properties for the producer" for more 
details.
    +
    +###Using wildcard kafka topic match
    +You can do a wildcard topic match by adding the following config
    +```
    +     Config config = new Config();
    +     config.put("kafka.topic.wildcard.match",true);
     
    -# Usage Examples
    +```
     
    -### Create a Kafka Spout
    +After this you can specify a wildcard topic for matching e.g. 
clickstream.*.log.  This will match all streams matching clickstream.my.log, 
clickstream.cart.log etc
     
    -The code snippet bellow is extracted from the example in the module [test] 
(https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test).
 The code that is common for named topics and topic wildcards is in the first 
box. The specific implementations are in the appropriate section. 
     
    -These snippets serve as a reference and do not compile. If you would like 
to reuse this code in your implementation, please obtain it from the test 
module, where it is complete.
    +###Putting it all together
     
    +For the bolt :
     ```java
    -KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
    -
    -KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, 
String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
    -        .setOffsetCommitPeriodMs(10_000)
    -        .setFirstPollOffsetStrategy(EARLIEST)
    -        .setMaxUncommittedOffsets(250)
    -        .build();
    -
    -Map<String, Object> kafkaConsumerProps= new HashMap<>();
    
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");
    
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");
    
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
    
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
    -
    -KafkaSpoutRetryService retryService = new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
    -        KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), 
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        Fields fields = new Fields("key", "message");
    +        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
    +                    new Values("storm", "1"),
    +                    new Values("trident", "1"),
    +                    new Values("needs", "1"),
    +                    new Values("javadoc", "1")
    +        );
    +        spout.setCycle(true);
    +        builder.setSpout("spout", spout, 5);
    +        //set producer properties.
    +        Properties props = new Properties();
    +        props.put("bootstrap.servers", "localhost:9092");
    --- End diff --
    
    Nit: The property names have constants defined in 
`org.apache.kafka.clients.producer.ProducerConfig`, might be good to point 
people to them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to