Repository: apex-malhar Updated Branches: refs/heads/master 07812a903 -> 352e2d92c
APEXMALHAR-2242 Added user documentation for 0.9 version of Kafka Input Operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bd502e7b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bd502e7b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bd502e7b Branch: refs/heads/master Commit: bd502e7b9f1df2c105d7f9a8044de439463aa299 Parents: 07812a9 Author: chaitanya <chai...@apache.org> Authored: Tue Sep 27 22:01:50 2016 +0530 Committer: Thomas Weise <t...@apache.org> Committed: Thu Oct 13 08:37:30 2016 -0700 ---------------------------------------------------------------------- docs/operators/kafkaInputOperator.md | 146 +++++++++++++++++++++++++++++- 1 file changed, 145 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bd502e7b/docs/operators/kafkaInputOperator.md ---------------------------------------------------------------------- diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md index 1d2258e..cb29e5d 100644 --- a/docs/operators/kafkaInputOperator.md +++ b/docs/operators/kafkaInputOperator.md @@ -11,7 +11,9 @@ Kafka is a pull-based and distributed publish subscribe messaging system, topics nodes. Kafka input operator is needed when you want to read data from multiple partitions of a Kafka topic in parallel in an Apex application. -### AbstractKafkaInputOperator +### 0.8 Version of Kafka Input Operator + +### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka) This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesnât have any ports. @@ -280,3 +282,145 @@ Below is the configuration for âtestâ Kafka topic name and <value>localhost:2181</value> </property> ``` + + +### 0.9 Version of Kafka Input Operator + +### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka) + +This version uses the new 0.9 version of consumer API and features of this version are described here. This operator is fault-tolerant, scalable, multi-cluster and multi-topic support. + +#### Pre-requisites + +This operator requires version 0.9.0 or later of the Kafka Consumer API. + +#### Ports +---------- + +This abstract class doesn't have any ports. + +#### Configuration properties +---------------------------- + +- ***clusters*** - String[] + - Mandatory Parameter. + - Specifies the Kafka clusters that you want to consume messages from. To configure multi-cluster support, you need to specify the clusters separated by ";". + +- ***topics*** - String[] + - Mandatory Parameter. + - Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by ",". + +- ***strategy*** - PartitionStrategy + - Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY. + + ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances. + ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances. + Default Value = PartitionStrategy.ONE_TO_ONE. + +- ***initialPartitionCount*** - Integer + - When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. + Default Value = 1. + +- ***repartitionInterval*** - Long + - Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. + Default Value = 30 Seconds. + +- ***repartitionCheckInterval*** - Long + - Interval specified in milliseconds. This value specifies the minimum interval between two stat checks. + Default Value = 5 Seconds. + +- ***maxTuplesPerWindow*** - Integer + - Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. + Default value = MAX_VALUE + +- ***initialOffset*** - InitialOffset + - Indicates the type of offset i.e, âEARLIEST or LATEST or APPLICATION_OR_EARLIEST or APPLICATION_OR_LATESTâ. + LATEST => Operator consume messages from latest point of Kafka queue. + EARLIEST => Operator consume messages starting from message queue. + APPLICATION_OR_EARLIEST => Operator consume messages from committed position from last run. If there is no committed offset, then it starts consuming from beginning of kafka queue. + APPLICATION_OR_LATEST => Operator consume messages from committed position from last run. If there is not committed offset, then it starts consuming from latest position of queue. + Default value = InitialOffset.APPLICATION_OR_LATEST + +- ***metricsRefreshInterval*** - Long + - Interval specified in milliseconds. This value specifies the minimum interval between two metric stat updates. + Default value = 5 Seconds. + +- ***consumerTimeout*** - Long + - Indicates the time waiting in poll data if data is not available. Please refer the below link: + http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll + Default value = 5 Seconds. + +- ***holdingBufferSize*** - Long + - Indicates the maximum number of messages kept in memory for emitting. + Default value = 1024. + +- ***consumerProps*** - Properties + - Specify the consumer properties which are not yet set to the operator. Please refer the below link for consumer properties: + http://kafka.apache.org/090/documentation.html#newconsumerconfigs + +- ***windowDataManager*** - WindowDataManager + - Specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. + Default value = WindowDataManager.NoopWindowDataManager. + +#### Abstract Methods + +void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message): Abstract method that emits tuples +extracted from Kafka message. + +### Concrete Classes + +#### KafkaSinglePortInputOperator +This class extends from AbstractKafkaInputOperator and define the getTuple() method which extracts byte array from Kafka message. + +#### Ports +outputPort <byte[]>: Tuples extracted from Kafka messages are emitted through this port. + +### Application Example +This section builds an Apex application using Kafka input operator. +Below is the code snippet: + +```java +@ApplicationAnnotation(name = "KafkaApp") +public class ExampleKafkaApplication implements StreamingApplication +{ +@Override +public void populateDAG(DAG dag, Configuration entries) +{ + KafkaSinglePortInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortInputOperator()); + + ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); + + dag.addStream("MessageData", input.outputPort, output.input); +} +} +``` +Below is the configuration for âtestâ Kafka topic name and +âlocalhost:9092â is the Broker: + +```xml +<property> +<name>dt.operator.MessageReader.prop.topics</name> +<value>test</value> +</property> + +<property> +<name>dt.operator.KafkaInputOperator.prop.clusters</nam> +<value>localhost:9092</value> +</property> +``` + +By adding following lines to properties file, Kafka Input Operator supports multi-topic and multi-cluster: + +```xml +<property> +<name>dt.operator.MessageReader.prop.topics</name> +<value>test1, test2</value> +</property> + +<property> +<name>dt.operator.KafkaInputOperator.prop.clusters</nam> +<value>localhost:9092; localhost:9093; localhost:9094</value> +</property> +``` + +For more details about example application, Please refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka.