Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/1808#discussion_r96712816
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
-Apache Storm Spout implementation to consume data from Apache Kafka
versions 0.10 onwards (please see [Apache Kafka Version Compatibility]
(#compatibility)).
+##Compatibility
-The Apache Storm Spout allows clients to consume data from Kafka starting
at offsets as defined by the offset strategy specified in
`FirstPollOffsetStrategy`.
-In case of failure, the Kafka Spout will re-start consuming messages from
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
-The Kafka Spout implementation allows you to specify the stream
(`KafkaSpoutStream`) associated with each topic or topic wildcard.
`KafkaSpoutStream` represents the stream and output fields. For named topics
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use
`KafkaSpoutStreamsWildcardTopics`.
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState,
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s
from `ConsumerRecord`s. The logic is provided by the user through implementing
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
-Multiple topics and topic wildcards can use the same
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
+```java
+ K getKeyFromTuple(Tuple/TridentTuple tuple);
+ V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper
for backward compatibility
+reasons. Alternatively you could also specify a different key and message
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key
and message as there is no default constructor.
+These should be specified while constructing an instance of
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+ String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by
calling `KafkaBolt.withProducerProperties()` and
`TridentKafkaStateFactory.withProducerProperties()`. Please see
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more
details.
+These are also defined in
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
-# Usage Examples
+```
-### Create a Kafka Spout
+After this you can specify a wildcard topic for matching e.g.
clickstream.*.log. This will match all streams matching clickstream.my.log,
clickstream.cart.log etc
-The code snippet bellow is extracted from the example in the module [test]
(https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test).
The code that is common for named topics and topic wildcards is in the first
box. The specific implementations are in the appropriate section.
-These snippets serve as a reference and do not compile. If you would like
to reuse this code in your implementation, please obtain it from the test
module, where it is complete.
+###Putting it all together
+For the bolt :
```java
-KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
-
-KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String,
String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
- .setOffsetCommitPeriodMs(10_000)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .build();
-
-Map<String, Object> kafkaConsumerProps= new HashMap<>();
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-
-KafkaSpoutRetryService retryService = new
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
- KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
+ TopologyBuilder builder = new TopologyBuilder();
+
+ Fields fields = new Fields("key", "message");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", "1"),
+ new Values("trident", "1"),
+ new Values("needs", "1"),
+ new Values("javadoc", "1")
+ );
+ spout.setCycle(true);
+ builder.setSpout("spout", spout, 5);
+ //set producer properties.
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("acks", "1");
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+
+ KafkaBolt bolt = new KafkaBolt()
+ .withProducerProperties(props)
+ .withTopicSelector(new DefaultTopicSelector("test"))
+ .withTupleToKafkaMapper(new
FieldNameBasedTupleToKafkaMapper());
+ builder.setBolt("forwardToKafka", bolt,
8).shuffleGrouping("spout");
+
+ Config conf = new Config();
+
+ StormSubmitter.submitTopology("kafkaboltTest", conf,
builder.createTopology());
```
-### Named Topics
+For Trident:
+
```java
-KafkaSpoutStreams kafkaSpoutStreams = new
KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new
String[]{TOPICS[0], TOPICS[1]})
- .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})
// contents of topic test2 sent to test_stream
- .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})
// contents of topic test2 sent to test2_stream
- .build();
-
-KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new
KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
- new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0],
TOPICS[1]),
- new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
- .build();
-
-String[] STREAMS = new String[]{"test_stream", "test1_stream",
"test2_stream"};
-String[] TOPICS = new String[]{"test", "test1", "test2"};
-
-Fields outputFields = new Fields("topic", "partition", "offset", "key",
"value");
-Fields outputFields1 = new Fields("topic", "partition", "offset");
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", "1"),
+ new Values("trident", "1"),
+ new Values("needs", "1"),
+ new Values("javadoc", "1")
+ );
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ //set producer properties.
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("acks", "1");
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+
+ TridentKafkaStateFactory stateFactory = new
TridentKafkaStateFactory()
+ .withProducerProperties(props)
+ .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+ .withTridentTupleToKafkaMapper(new
FieldNameBasedTupleToKafkaMapper("word", "count"));
+ stream.partitionPersist(stateFactory, fields, new
TridentKafkaUpdater(), new Fields());
+
+ Config conf = new Config();
+ StormSubmitter.submitTopology("kafkaTridentTest", conf,
topology.build());
```
-### Topic Wildcards
+## Reading From kafka (Spouts)
+
+### Configuration
+
+The spout implementations are configured by use of the `KafkaSpoutConfig`
class. This class uses a Builder pattern and can be started either by calling
one of
+the Builders constructors or by calling the static method builder in the
KafkaSpoutConfig class.
+
+The Constructor or static method to create the builder require a few key
values (that can be changed later on) but are the minimum config needed to start
+a spout.
+
+`bootstrapServers` is the same as the Kafka Consumer Property
"bootstrap.servers".
+`topics` The topics the spout will consume can either be a `Collection` of
specific topic names (1 or more) or a regular expression `Pattern` and any
+topics that match that regular expression will be consumed.
+
+In the case of the Constructors you may also need to specify a key
deserializer and a value deserializer. This is to help make the java generics
happy
+and help maintain type safety. The defaults are `StringDeserializer`s and
can be overwritten by calling `setKeyDeserializer` and/or
`setValueDeserializer`.
+If these are set to null the code will fall back to what is set in the
kafka properties, but it is preferable to be explicit here, again to maintain
+type safety with the generics.
+
+There are a few key configs to pay attention to.
+
+`setFirstPollOffsetStrategy` allows you to set where to start consuming
data from. This is used both in case of failure recovery and starting the spout
+for the first time. Allowed values include
+
+ * `EARLIEST` means that the kafka spout polls records starting in the
first offset of the partition, regardless of previous commits
+ * `LATEST` means that the kafka spout polls records with offsets greater
than the last offset in the partition, regardless of previous commits
+ * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls
records from the last committed offset, if any. If no offset has been
committed, it behaves as `EARLIEST`.
+ * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the
last committed offset, if any. If no offset has been committed, it behaves as
`LATEST`.
+
+`setRecordTranslator` allows you to modify how the spout converts a
`ConsumerRecord` into a Tuple and which stream that tuple will go to. By
default the "topic",
+"partition", "offset", "key", and "value" will be emitted to the "default"
stream. If you want to output entries to different streams based on the topic
storm
+provides `ByTopicRecordTranslator`. See below for more examples on how to
use these.
+
+`setProp` can be used to set kafka properties that do not have a
convenience method.
--- End diff --
"Optional" - should we rename this method to `setKafkaProp` or
`setKafkaBrokerProp` ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---