Repository: apex-malhar
Updated Branches:
  refs/heads/master 07812a903 -> 352e2d92c


APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bd502e7b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bd502e7b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bd502e7b

Branch: refs/heads/master
Commit: bd502e7b9f1df2c105d7f9a8044de439463aa299
Parents: 07812a9
Author: chaitanya <chai...@apache.org>
Authored: Tue Sep 27 22:01:50 2016 +0530
Committer: Thomas Weise <t...@apache.org>
Committed: Thu Oct 13 08:37:30 2016 -0700

----------------------------------------------------------------------
 docs/operators/kafkaInputOperator.md | 146 +++++++++++++++++++++++++++++-
 1 file changed, 145 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bd502e7b/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md 
b/docs/operators/kafkaInputOperator.md
index 1d2258e..cb29e5d 100644
--- a/docs/operators/kafkaInputOperator.md
+++ b/docs/operators/kafkaInputOperator.md
@@ -11,7 +11,9 @@ Kafka is a pull-based and distributed publish subscribe 
messaging system, topics
 nodes. Kafka input operator is needed when you want to read data from multiple
 partitions of a Kafka topic in parallel in an Apex application.
 
-### AbstractKafkaInputOperator
+### 0.8 Version of Kafka Input Operator
+
+### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka)
 
 This is the abstract implementation that serves as base class for consuming 
messages from Kafka messaging system. This class doesn’t have any ports.
 
@@ -280,3 +282,145 @@ Below is the configuration for “test” Kafka topic 
name and
 <value>localhost:2181</value>
 </property>
 ```
+
+
+### 0.9 Version of Kafka Input Operator
+
+### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka)
+
+This version uses the new 0.9 version of consumer API and features of this 
version are described here. This operator is fault-tolerant, scalable, 
multi-cluster and multi-topic support.
+
+#### Pre-requisites
+
+This operator requires version 0.9.0 or later of the Kafka Consumer API.
+
+#### Ports
+----------
+
+This abstract class doesn't have any ports.
+
+#### Configuration properties
+----------------------------
+
+-   ***clusters*** - String[]
+    -   Mandatory Parameter.
+    -   Specifies the Kafka clusters that you want to consume messages from. 
To configure multi-cluster support, you need to specify the clusters separated 
by ";".
+    
+-   ***topics*** - String[]
+    -   Mandatory Parameter.
+    -   Specified the Kafka topics that you want to consume messages from. If 
you want multi-topic support, then specify the topics separated by ",".
+
+-   ***strategy*** - PartitionStrategy
+    -   Operator supports two types of partitioning strategies, ONE_TO_ONE and 
ONE_TO_MANY.
+    
+        ONE_TO_ONE: If this is enabled, the AppMaster creates one input 
operator instance per Kafka topic partition. So the number of Kafka topic 
partitions equals the number of operator instances.
+        ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) 
Kafka input operator instances where N is the number of Kafka topic partitions. 
If K is less than N, the remaining topic partitions are assigned to the K 
operator instances in round-robin fashion. If K is less than 
initialPartitionCount, the AppMaster creates one input operator instance per 
Kafka topic partition. For example, if initialPartitionCount = 5 and number of 
Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+        Default Value = PartitionStrategy.ONE_TO_ONE.
+
+-   ***initialPartitionCount*** - Integer
+    -   When the ONE_TO_MANY partition strategy is enabled, this value 
indicates the number of Kafka input operator instances. 
+        Default Value = 1.
+
+-   ***repartitionInterval*** - Long
+    -   Interval specified in milliseconds. This value specifies the minimum 
time required between two repartition actions. 
+        Default Value = 30 Seconds.
+
+-   ***repartitionCheckInterval*** - Long
+    -   Interval specified in milliseconds. This value specifies the minimum 
interval between two stat checks.
+        Default Value = 5 Seconds.
+
+-   ***maxTuplesPerWindow*** - Integer
+    -   Controls the maximum number of messages emitted in each streaming 
window from this operator. Minimum value is 1. 
+        Default value = MAX_VALUE 
+
+-   ***initialOffset*** - InitialOffset
+    -   Indicates the type of offset i.e, “EARLIEST or LATEST or 
APPLICATION_OR_EARLIEST or APPLICATION_OR_LATEST”. 
+        LATEST => Operator consume messages from latest point of Kafka queue. 
+        EARLIEST => Operator consume messages starting from message queue.
+        APPLICATION_OR_EARLIEST => Operator consume messages from committed 
position from last run. If there is no committed offset, then it starts 
consuming from beginning of kafka queue.
+        APPLICATION_OR_LATEST => Operator consume messages from committed 
position from last run. If there is not committed offset, then it starts 
consuming from latest position of queue.
+        Default value = InitialOffset.APPLICATION_OR_LATEST
+
+-   ***metricsRefreshInterval*** - Long
+    -   Interval specified in milliseconds. This value specifies the minimum 
interval between two metric stat updates.
+        Default value = 5 Seconds.
+
+-   ***consumerTimeout*** - Long
+    -   Indicates the time waiting in poll data if data is not available. 
Please refer the below link:
+        
http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll
+        Default value = 5 Seconds.
+
+-   ***holdingBufferSize*** - Long
+    -   Indicates the maximum number of messages kept in memory for emitting.
+        Default value = 1024.
+
+-   ***consumerProps*** - Properties
+    -   Specify the consumer properties which are not yet set to the operator. 
Please refer the below link for consumer properties:
+        http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+
+-   ***windowDataManager*** - WindowDataManager
+    -   Specifies that the operator will process the same set of messages in a 
window before and after a failure. This is important but it comes with higher 
cost because at the end of each window the operator needs to persist some state 
with respect to that window.
+        Default value = WindowDataManager.NoopWindowDataManager.
+        
+#### Abstract Methods
+
+void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message): 
Abstract method that emits tuples
+extracted from Kafka message.
+
+### Concrete Classes
+
+#### KafkaSinglePortInputOperator
+This class extends from AbstractKafkaInputOperator and define the getTuple() 
method which extracts byte array from Kafka message.
+
+#### Ports
+outputPort <byte[]>: Tuples extracted from Kafka messages are emitted through 
this port.
+
+### Application Example
+This section builds an Apex application using Kafka input operator.
+Below is the code snippet:
+
+```java
+@ApplicationAnnotation(name = "KafkaApp")
+public class ExampleKafkaApplication implements StreamingApplication
+{
+@Override
+public void populateDAG(DAG dag, Configuration entries)
+{
+  KafkaSinglePortInputOperator input =  dag.addOperator("MessageReader", new 
KafkaSinglePortInputOperator());
+
+  ConsoleOutputOperator output = dag.addOperator("Output", new 
ConsoleOutputOperator());
+
+  dag.addStream("MessageData", input.outputPort, output.input);
+}
+}
+```
+Below is the configuration for “test” Kafka topic name and
+“localhost:9092” is the Broker:
+
+```xml
+<property>
+<name>dt.operator.MessageReader.prop.topics</name>
+<value>test</value>
+</property>
+
+<property>
+<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+<value>localhost:9092</value>
+</property>
+```
+
+By adding following lines to properties file, Kafka Input Operator supports 
multi-topic and multi-cluster:
+ 
+```xml
+<property>
+<name>dt.operator.MessageReader.prop.topics</name>
+<value>test1, test2</value>
+</property>
+ 
+<property>
+<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+<value>localhost:9092; localhost:9093; localhost:9094</value>
+</property>
+```
+
+For more details about example application, Please refer to 
https://github.com/DataTorrent/examples/tree/master/tutorials/kafka.

Reply via email to