STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: add 
storm-kafka-client doc from 1.x-branch, and link to it from index.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/72fb4170
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/72fb4170
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/72fb4170

Branch: refs/heads/1.0.x-branch
Commit: 72fb41705dbb5a4709aabf1ed4ac53f0eca6284a
Parents: 29fc006
Author: Erik Weathers <eri...@gmail.com>
Authored: Wed Feb 7 00:16:37 2018 -0800
Committer: Erik Weathers <eri...@gmail.com>
Committed: Wed Feb 7 18:53:24 2018 -0800

----------------------------------------------------------------------
 docs/index.md              |   2 +-
 docs/storm-kafka-client.md | 361 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 362 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/72fb4170/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 17f5d80..d3e8808 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -78,7 +78,7 @@ Trident is an alternative interface to Storm. It provides 
exactly-once processin
 * [Event Logging](Eventlogging.html)
 
 ### Integration With External Systems, and Other Libraries
-* [Apache Kafka Integration](storm-kafka.html)
+* [Apache Kafka Integration](storm-kafka.html), [New Kafka Consumer 
Integration](storm-kafka-client.html)
 * [Apache HBase Integration](storm-hbase.html)
 * [Apache HDFS Integration](storm-hdfs.html)
 * [Apache Hive Integration](storm-hive.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/72fb4170/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
new file mode 100644
index 0000000..2992dd6
--- /dev/null
+++ b/docs/storm-kafka-client.md
@@ -0,0 +1,361 @@
+---
+title: Storm Kafka Integration (0.10.x+)
+layout: documentation
+documentation: true
+---
+# Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka consumer API.
+
+## Compatibility
+
+Apache Kafka versions 0.10 onwards
+
+## Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach 
it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
+
+You need to provide implementations for the following 2 interfaces
+
+### TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
+
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka key 
and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a field 
with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for 
backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key and 
message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+### KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+
+```java
+public interface KafkaTopicSelector {
+    String getTopics(Tuple/TridentTuple tuple);
+}
+```
+
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one static 
topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to select 
the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic name 
in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by calling 
`KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more details.
+These are also defined in `org.apache.kafka.clients.producer.ProducerConfig`
+
+### Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+
+```java
+Config config = new Config();
+config.put("kafka.topic.wildcard.match",true);
+```
+
+After this you can specify a wildcard topic for matching e.g. 
clickstream.*.log.  This will match all streams matching clickstream.my.log, 
clickstream.cart.log etc
+
+
+### Putting it all together
+
+For the bolt :
+
+```java
+TopologyBuilder builder = new TopologyBuilder();
+
+Fields fields = new Fields("key", "message");
+FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+            new Values("storm", "1"),
+            new Values("trident", "1"),
+            new Values("needs", "1"),
+            new Values("javadoc", "1")
+);
+spout.setCycle(true);
+builder.setSpout("spout", spout, 5);
+//set producer properties.
+Properties props = new Properties();
+props.put("bootstrap.servers", "localhost:9092");
+props.put("acks", "1");
+props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+KafkaBolt bolt = new KafkaBolt()
+        .withProducerProperties(props)
+        .withTopicSelector(new DefaultTopicSelector("test"))
+        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+Config conf = new Config();
+
+StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
+```
+
+For Trident:
+
+```java
+Fields fields = new Fields("word", "count");
+FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+        new Values("storm", "1"),
+        new Values("trident", "1"),
+        new Values("needs", "1"),
+        new Values("javadoc", "1")
+);
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+Stream stream = topology.newStream("spout1", spout);
+
+//set producer properties.
+Properties props = new Properties();
+props.put("bootstrap.servers", "localhost:9092");
+props.put("acks", "1");
+props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+        .withProducerProperties(props)
+        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+        .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
+stream.partitionPersist(stateFactory, fields, new TridentKafkaStateUpdater(), 
new Fields());
+
+Config conf = new Config();
+StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
+```
+
+## Reading From kafka (Spouts)
+
+### Configuration
+
+The spout implementations are configured by use of the `KafkaSpoutConfig` 
class.  This class uses a Builder pattern and can be started either by calling 
one of
+the Builders constructors or by calling the static method builder in the 
KafkaSpoutConfig class.
+
+The Constructor or static method to create the builder require a few key 
values (that can be changed later on) but are the minimum config needed to start
+a spout.
+
+`bootstrapServers` is the same as the Kafka Consumer Property 
"bootstrap.servers".
+`topics` The topics the spout will consume can either be a `Collection` of 
specific topic names (1 or more) or a regular expression `Pattern`, which 
specifies
+that any topics that match that regular expression will be consumed.
+
+If you are using the Builder Constructors instead of one of the `builder` 
methods, you will also need to specify a key deserializer and a value 
deserializer.  This is to help guarantee type safety through the use
+of Java generics.  The deserializers can be specified via the consumer 
properties set with `setProp`. See the KafkaConsumer configuration 
documentation for details.
+
+There are a few key configs to pay attention to.
+
+`setFirstPollOffsetStrategy` allows you to set where to start consuming data 
from.  This is used both in case of failure recovery and starting the spout
+for the first time. The allowed values are listed in the 
[FirstPollOffsetStrategy 
javadocs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html).
+
+`setProcessingGuarantee` lets you configure what processing guarantees the 
spout will provide. This affects how soon consumed offsets can be committed, 
and the frequency of commits. See the [ProcessingGuarantee 
javadoc](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html)
 for details.
+
+`setRecordTranslator` allows you to modify how the spout converts a Kafka 
Consumer Record into a Tuple, and which stream that tuple will be published 
into.
+By default the "topic", "partition", "offset", "key", and "value" will be 
emitted to the "default" stream.  If you want to output entries to different
+streams based on the topic, storm provides `ByTopicRecordTranslator`.  See 
below for more examples on how to use these.
+
+`setProp` and `setProps` can be used to set KafkaConsumer properties. The list 
of these properties can be found in the KafkaConsumer configuration 
documentation on the [Kafka 
website](http://kafka.apache.org/documentation.html#consumerconfigs). Note that 
KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will 
throw an exception if the "enable.auto.commit" property is set, and the 
consumer used by the spout will always have that property set to false. You can 
configure similar behavior to autocommit through the `setProcessingGuarantee` 
method on the KafkaSpoutConfig builder.
+
+### Usage Examples
+
+The API is written with java 8 lambda expressions in mind.  It works with 
java7 and below though.
+
+#### Create a Simple Insecure Spout
+The following will consume all events published to "topic" and send them to 
MyBolt with the fields "topic", "partition", "offset", "key", "value".
+
+```java
+
+final TopologyBuilder tp = new TopologyBuilder();
+tp.setSpout("kafka_spout", new 
KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 
1);
+tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
+...
+```
+
+#### Wildcard Topics
+Wildcard topics will consume from all topics that exist in the specified 
brokers list and match the pattern.  So in the following example
+"topic", "topic_foo" and "topic_bar" will all match the pattern "topic.*", but 
"not_my_topic" would not match. 
+
+```java
+
+final TopologyBuilder tp = new TopologyBuilder();
+tp.setSpout("kafka_spout", new 
KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, 
Pattern.compile("topic.*")).build()), 1);
+tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
+...
+```
+
+#### Multiple Streams
+
+This uses java 8 lambda expressions.
+
+final TopologyBuilder tp = new TopologyBuilder();
+
+//By default all topics not covered by another rule, but consumed by the spout 
will be emitted to "STREAM_1" as "topic", "key", and "value"
+ByTopicRecordTranslator<String, String> byTopic = new 
ByTopicRecordTranslator<>(
+    (r) -> new Values(r.topic(), r.key(), r.value()),
+    new Fields("topic", "key", "value"), "STREAM_1");
+//For topic_2 all events will be emitted to "STREAM_2" as just "key" and 
"value"
+byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new 
Fields("key", "value"), "STREAM_2");
+
+tp.setSpout("kafka_spout", new 
KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", 
"topic_2", "topic_3").build()), 1);
+tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
+tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", 
"STREAM_2");
+...
+```
+
+#### Trident
+
+```java
+final TridentTopology tridentTopology = new TridentTopology();
+final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
+    new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, Pattern.compile("topic.*")).build()))
+      .parallelismHint(1)
+...
+```
+
+Trident does not support multiple streams and will ignore any streams set for 
output.  If however the Fields are not identical for each
+output topic it will throw an exception and not continue.
+
+### Custom RecordTranslators (ADVANCED)
+
+In most cases the built in SimpleRecordTranslator and ByTopicRecordTranslator 
should cover your use case.  If you do run into a situation where you need a 
custom one
+then this documentation will describe how to do this properly, and some of the 
less than obvious classes involved.
+
+The point of `apply` is to take a ConsumerRecord and turn it into a 
`List<Object>` that can be emitted.  What is not obvious is how to tell the 
spout to emit it to a
+specific stream.  To do this you will need to return an instance of 
`org.apache.storm.kafka.spout.KafkaTuple`.  This provides a method `routedTo` 
that will say which
+specific stream the tuple should go to.
+
+For Example:
+
+```java
+return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
+```
+
+Will cause the tuple to be emitted on the "bar" stream.
+
+Be careful when writing custom record translators because just like in a storm 
spout it needs to be self consistent.  The `streams` method should return
+a full set of streams that this translator will ever try to emit on.  
Additionally `getFieldsFor` should return a valid Fields object for each of 
those
+streams.  If you are doing this for Trident a value must be in the List 
returned by `apply` for every field in the Fields object for that stream,
+otherwise trident can throw exceptions.
+
+
+### Manual Partition Assigment (ADVANCED)
+
+By default the KafkaSpout instances will be assigned partitions using a round 
robin strategy. If you need to customize partition assignment, you must 
implement the `ManualPartitioner` interface. The implementation can be passed 
to the `ManualPartitionSubscription` constructor, and the `Subscription` can 
then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` 
constructor. Please take care when supplying a custom implementation, since an 
incorrect `ManualPartitioner` implementation could leave some partitions 
unread, or concurrently read by multiple spout instances. See the 
`RoundRobinManualPartitioner` for an example of how to implement this 
functionality.
+
+## Use the Maven Shade Plugin to Build the Uber Jar
+
+Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+
+```xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>2.4.1</version>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                        
<mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
+                    </transformer>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+```
+
+create the uber jar by running the command:
+
+`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
+
+This will create the uber jar file with the name and location matching the 
following pattern:
+ 
+`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
+
+### Run Storm Topology
+
+Copy the file 
`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-*.jar` 
to `STORM_HOME/extlib`
+
+Using the Kafka command line tools create three topics [test, test1, test2] 
and use the Kafka console producer to populate the topics with some data 
+
+Execute the command `STORM_HOME/bin/storm jar 
REPO_HOME/storm/external/storm/target/storm-kafka-client-*.jar 
org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
+
+With the debug level logs enabled it is possible to see the messages of each 
topic being redirected to the appropriate Bolt as defined 
+by the streams defined and choice of shuffle grouping.   
+
+## Using storm-kafka-client with different versions of kafka
+
+Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, 
meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka 
dependency compatible with your kafka cluster.
+
+When building a project with storm-kafka-client, you must explicitly add the 
Kafka clients dependency. For example, to
+use Kafka-clients 0.10.0.0, you would use the following dependency in your 
`pom.xml`:
+
+```xml
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.0</version>
+        </dependency>
+```
+
+You can also override the kafka clients version while building from maven, 
with parameter `storm.kafka.client.version`
+e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
+
+When selecting a kafka client version, you should ensure -
+ 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or 
newer** kafka client API. For older versions,
+ you can use storm-kafka module 
(https://github.com/apache/storm/tree/master/external/storm-kafka).  
+ 2. The kafka client selected by you should be wire compatible with the 
broker. e.g. 0.9.x client will not work with 
+ 0.8.x broker. 
+
+# Kafka Spout Performance Tuning
+
+The Kafka spout provides two internal parameters to control its performance. 
The parameters can be set using the 
[setOffsetCommitPeriodMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setOffsetCommitPeriodMs-long-)
 and 
[setMaxUncommittedOffsets](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setMaxUncommittedOffsets-int-)
 methods. 
+
+* "offset.commit.period.ms" controls how often the spout commits to Kafka
+* "max.uncommitted.offsets" controls how many offsets can be pending commit 
before another poll can take place
+<br/>
+
+The [Kafka consumer config] 
(http://kafka.apache.org/documentation.html#consumerconfigs) parameters may 
also have an impact on the performance of the spout. The following Kafka 
parameters are likely the most influential in the spout performance: 
+
+* “fetch.min.bytes”
+* “fetch.max.wait.ms”
+* [Kafka 
Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 instance poll timeout, which is specified for each Kafka spout using the 
[setPollTimeoutMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setPollTimeoutMs-long-)
 method.
+<br/>
+
+Depending on the structure of your Kafka cluster, distribution of the data, 
and availability of data to poll, these parameters will have to be configured 
appropriately. Please refer to the Kafka documentation on Kafka parameter 
tuning.
+
+### Default values
+
+Currently the Kafka spout has has the following default values, which have 
been shown to give good performance in the test environment as described in 
this [blog post] 
(https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+
+* poll.timeout.ms = 200
+* offset.commit.period.ms = 30000   (30s)
+* max.uncommitted.offsets = 10000000
+<br/>
+
+# Tuple Tracking
+
+By default the spout only tracks emitted tuples when the processing guarantee 
is AT_LEAST_ONCE. It may be necessary to track
+emitted tuples with other processing guarantees to benefit from Storm features 
such as showing complete latency in the UI,
+or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING.
+
+```java
+KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
+  .builder(String bootstrapServers, String ... topics)
+  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
+  .setTupleTrackingEnforced(true)
+```
+
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file

Reply via email to