APEXMALHAR-1904 #comment
Add new KafkaInputOperator using kafka 0.9.x consumer API
Features includes:
Out-of-box One-to-one and one-to-many partition scheme support plus
customizable partition schema
Fault-tolerant when the input operator goes down, it redeploys on other
node
At-least-once semantics for operator failure (no matter which operator
fails)
At-least-once semantics for cold restart (no data loss even if you
restart the application)
Multi-cluster support, one operator can consume data from more than one
kafka clusters
Multi-topic support, one operator can subscribe multiple topics
Throughput control support, you can throttle number of tuple for each
streaming window
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7ee2c7e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7ee2c7e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7ee2c7e3
Branch: refs/heads/devel-3
Commit: 7ee2c7e3697663a5fad3112c9a780a5ffd4c3b11
Parents: 8790077
Author: Siyuan Hua <[email protected]>
Authored: Wed Jan 6 11:11:46 2016 -0800
Committer: Siyuan Hua <[email protected]>
Committed: Wed Jan 6 11:11:46 2016 -0800
----------------------------------------------------------------------
kafka/XmlJavadocCommentsExtractor.xsl | 48 ++
kafka/pom.xml | 229 ++++++++
.../kafka/AbstractKafkaInputOperator.java | 527 +++++++++++++++++++
.../malhar/kafka/AbstractKafkaPartitioner.java | 278 ++++++++++
.../apex/malhar/kafka/KafkaConsumerWrapper.java | 328 ++++++++++++
.../apache/apex/malhar/kafka/KafkaMetrics.java | 136 +++++
.../apex/malhar/kafka/KafkaPartition.java | 143 +++++
.../kafka/KafkaSinglePortInputOperator.java | 44 ++
.../apex/malhar/kafka/OneToManyPartitioner.java | 66 +++
.../apex/malhar/kafka/OneToOnePartitioner.java | 56 ++
.../apex/malhar/kafka/PartitionStrategy.java | 38 ++
.../malhar/kafka/KafkaInputOperatorTest.java | 216 ++++++++
.../malhar/kafka/KafkaOperatorTestBase.java | 308 +++++++++++
.../apex/malhar/kafka/KafkaTestPartitioner.java | 62 +++
.../apex/malhar/kafka/KafkaTestProducer.java | 169 ++++++
pom.xml | 1 +
16 files changed, 2649 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/kafka/XmlJavadocCommentsExtractor.xsl
b/kafka/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..ec72325
--- /dev/null
+++ b/kafka/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!--
+ Document : XmlJavadocCommentsExtractor.xsl
+ Created on : September 16, 2014, 11:30 AM
+ Description:
+ The transformation strips off all information except for comments and
tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+ <xsl:output method="xml" standalone="yes"/>
+
+ <!-- copy xml by selecting only the following nodes, attrbutes and text -->
+ <xsl:template match="node()|text()|@*">
+ <xsl:copy>
+ <xsl:apply-templates
select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+ </xsl:copy>
+ </xsl:template>
+
+ <!-- Strip off the following paths from the selected xml -->
+ <xsl:template match="//root/package/interface/interface
+ |//root/package/interface/method/@qualified
+ |//root/package/class/interface
+ |//root/package/class/class
+ |//root/package/class/method/@qualified
+ |//root/package/class/field/@qualified" />
+
+ <xsl:strip-space elements="*"/>
+</xsl:stylesheet>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
new file mode 100755
index 0000000..ff47524
--- /dev/null
+++ b/kafka/pom.xml
@@ -0,0 +1,229 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar</artifactId>
+ <version>3.3.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-kafka</artifactId>
+ <name>Apache Apex Malhar (incubating) Kafka Support</name>
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <!-- Publish tests jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- create resource directory for xml javadoc-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>createJavadocDirectory</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <tasks>
+ <delete
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+ <mkdir
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- generate javdoc -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <!-- generate xml javadoc -->
+ <execution>
+ <id>xml-doclet</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>javadoc</goal>
+ </goals>
+ <configuration>
+ <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+ <additionalparam>-d
${project.build.directory}/generated-resources/xml-javadoc -filename
${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+ <useStandardDocletOptions>false</useStandardDocletOptions>
+ <docletArtifact>
+ <groupId>com.github.markusbernhardt</groupId>
+ <artifactId>xml-doclet</artifactId>
+ <version>1.0.4</version>
+ </docletArtifact>
+ </configuration>
+ </execution>
+ <!-- generate default javadoc jar with custom tags -->
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ <tags>
+ <tag>
+ <name>customTag1</name>
+ <placement>a</placement>
+ <head>Custom Tag One:</head>
+ </tag>
+ <tag>
+ <name>customTag2</name>
+ <placement>a</placement>
+ <head>Custom Tag two:</head>
+ </tag>
+ <tag>
+ <name>customTag3</name>
+ <placement>a</placement>
+ <head>Custom Tag three:</head>
+ </tag>
+ </tags>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Transform xml javadoc to stripped down version containing only
class/interface comments and tags-->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>xml-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>transform-xmljavadoc</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>transform</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <transformationSets>
+ <transformationSet>
+
<dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+ <includes>
+
<include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+
<outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+ </transformationSet>
+ </transformationSets>
+ </configuration>
+ </plugin>
+ <!-- copy xml javadoc to class jar -->
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes</outputDirectory>
+ <resources>
+ <resource>
+
<directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+ <includes>
+
<include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.9.0.0</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.9.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.core.version}</version>
+ <type>jar</type>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
new file mode 100644
index 0000000..4f2f704
--- /dev/null
+++
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import com.google.common.base.Joiner;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+
+/**
+ * The abstract kafka input operator using kafka 0.9.0 new consumer API
+ * A scalable, fault-tolerant, at-least-once kafka input operator
+ * Key features includes:
+ *
+ * <ol>
+ * <li>Out-of-box One-to-one and one-to-many partition strategy support plus
customizable partition strategy
+ * refer to AbstractKafkaPartitioner </li>
+ * <li>Fault-tolerant when the input operator goes down, it redeploys on other
node</li>
+ * <li>At-least-once semantics for operator failure (no matter which operator
fails)</li>
+ * <li>At-least-once semantics for cold restart (no data loss even if you
restart the application)</li>
+ * <li>Multi-cluster support, one operator can consume data from more than one
kafka clusters</li>
+ * <li>Multi-topic support, one operator can subscribe multiple topics</li>
+ * <li>Throughput control support, you can throttle number of tuple for each
streaming window</li>
+ * </ol>
+ */
+public abstract class AbstractKafkaInputOperator implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>,
StatsListener, OffsetCommitCallback
+{
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
+
+ public enum InitialOffset
+ {
+ EARLIEST, // consume from beginning of the partition every time when
application restart
+ LATEST, // consume from latest of the partition every time when
application restart
+ APPLICATION_OR_EARLIEST, // consume from committed position from last run
or earliest if there is no committed offset(s)
+ APPLICATION_OR_LATEST // consume from committed position from last run or
latest if there is no committed offset(s)
+ }
+
+ @NotNull
+ private String[] clusters;
+
+ @NotNull
+ private String[] topics;
+
+ /**
+ * offset track for checkpoint
+ */
+ private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack
= new HashMap<>();
+
+ private int initialPartitionCount = 1;
+
+ private long repartitionInterval = 30000L;
+
+ private long repartitionCheckInterval = 5000L;
+
+ @Min(1)
+ private int maxTuplesPerWindow = Integer.MAX_VALUE;
+
+ /**
+ * By default the operator start consuming from the committed offset or the
latest one
+ */
+ private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
+
+ private long metricsRefreshInterval = 5000L;
+
+ private long consumerTimeout = 5000L;
+
+ private int holdingBufferSize = 1024;
+
+ private Properties consumerProps;
+
+ /**
+ * Assignment for each operator instance
+ */
+ private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
+
+ //=======================All transient fields==========================
+
+ /**
+ * Wrapper consumer object
+ * It wraps KafkaConsumer, maintains consumer thread and store messages in a
queue
+ */
+ private transient final KafkaConsumerWrapper consumerWrapper = new
KafkaConsumerWrapper();
+
+ /**
+ * By default the strategy is one to one
+ * @see PartitionStrategy
+ */
+ private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
+
+ /**
+ * count the emitted message in each window<br>
+ * non settable
+ */
+ private transient int emitCount = 0;
+
+ /**
+ * store offsets with window id, only keep offsets with windows that have
not been committed
+ */
+ private transient final List<Pair<Long,
Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new
LinkedList<>();
+
+ /**
+ * Application name is used as group.id for kafka consumer
+ */
+ private transient String applicationName;
+
+ private transient AbstractKafkaPartitioner partitioner;
+
+ private transient long currentWindowId;
+
+ private transient long lastCheckTime = 0L;
+
+ private transient long lastRepartitionTime = 0L;
+
+ @AutoMetric
+ private transient KafkaMetrics metrics;
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ consumerWrapper.start();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ consumerWrapper.stop();
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ //ask kafka consumer wrapper to store the committed offsets
+ for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta,
Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
+ Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item =
iter.next();
+ if (item.getLeft() <= windowId) {
+ if (item.getLeft() == windowId) {
+ consumerWrapper.commitOffsets(item.getRight());
+ }
+ iter.remove();
+ }
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ int count = consumerWrapper.messageSize();
+ if (maxTuplesPerWindow > 0) {
+ count = Math.min(count, maxTuplesPerWindow - emitCount);
+ }
+ for (int i = 0; i < count; i++) {
+ Pair<String, ConsumerRecord<byte[], byte[]>> tuple =
consumerWrapper.pollMessage();
+ ConsumerRecord<byte[], byte[]> msg = tuple.getRight();
+ emitTuple(tuple.getLeft(), msg);
+ AbstractKafkaPartitioner.PartitionMeta pm = new
AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
+ msg.topic(), msg.partition());
+ offsetTrack.put(pm, msg.offset());
+ }
+ emitCount += count;
+ }
+
+ protected abstract void emitTuple(String cluster, ConsumerRecord<byte[],
byte[]> message);
+
+ @Override
+ public void beginWindow(long wid)
+ {
+ emitCount = 0;
+ currentWindowId = wid;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ // copy current offset track to history memory
+ Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new
HashMap<>(offsetTrack);
+ offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow));
+
+ //update metrics
+ metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics());
+ }
+
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+ consumerWrapper.create(this);
+ metrics = new KafkaMetrics(metricsRefreshInterval);
+ }
+
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ private void initPartitioner()
+ {
+ if (partitioner == null) {
+ logger.info("Initialize Partitioner");
+ switch (strategy) {
+ case ONE_TO_ONE:
+ partitioner = new OneToOnePartitioner(clusters, topics, this);
+ break;
+ case ONE_TO_MANY:
+ partitioner = new OneToManyPartitioner(clusters, topics, this);
+ break;
+ case ONE_TO_MANY_HEURISTIC:
+ throw new UnsupportedOperationException("Not implemented yet");
+ default:
+ throw new RuntimeException("Invalid strategy");
+ }
+ logger.info("Actual Partitioner is {}", partitioner.getClass());
+ }
+
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats batchedOperatorStats)
+ {
+ long t = System.currentTimeMillis();
+ if (repartitionInterval < 0 || repartitionCheckInterval < 0 ||
+ t - lastCheckTime < repartitionCheckInterval || t -
lastRepartitionTime < repartitionInterval) {
+ // return false if it's within repartitionCheckInterval since last time
it check the stats
+ Response response = new Response();
+ response.repartitionRequired = false;
+ return response;
+ }
+
+ try {
+ logger.debug("Process stats");
+ initPartitioner();
+ return partitioner.processStats(batchedOperatorStats);
+ } finally {
+ lastCheckTime = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
+ Collection<Partition<AbstractKafkaInputOperator>> collection,
PartitioningContext partitioningContext)
+ {
+ logger.debug("Define partitions");
+ initPartitioner();
+ return partitioner.definePartitions(collection, partitioningContext);
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>>
map)
+ {
+ // update the last repartition time
+ lastRepartitionTime = System.currentTimeMillis();
+ initPartitioner();
+ partitioner.partitioned(map);
+ }
+
+ /**
+ *
+ * A callback from consumer after it commits the offset
+ * @param map
+ * @param e
+ */
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception
e)
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Commit offsets complete {} ",
Joiner.on(';').withKeyValueSeparator("=").join(map));
+ }
+ if (e != null) {
+ logger.warn("Exceptions in committing offsets {} : {} ",
+ Joiner.on(';').withKeyValueSeparator("=").join(map), e);
+ }
+ }
+
+ public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
+ {
+ this.assignment = assignment;
+ }
+
+ public Set<AbstractKafkaPartitioner.PartitionMeta> assignment()
+ {
+ return assignment;
+ }
+
+ //---------------------------------------------setters and
getters----------------------------------------
+ public void setInitialPartitionCount(int partitionCount)
+ {
+ this.initialPartitionCount = partitionCount;
+ }
+
+ /**
+ * initial partition count
+ * only used with PartitionStrategy.ONE_TO_MANY
+ * or customized strategy
+ */
+ public int getInitialPartitionCount()
+ {
+ return initialPartitionCount;
+ }
+
+ public void setClusters(String clusters)
+ {
+ this.clusters = clusters.split(";");
+ }
+
+ /**
+ * Same setting as bootstrap.servers property to KafkaConsumer
+ * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
+ * To support multi cluster, you can have multiple bootstrap.servers
separated by ";"
+ */
+ public String getClusters()
+ {
+ return Joiner.on(';').join(clusters);
+ }
+
+ public void setTopics(String... topics)
+ {
+ this.topics = topics;
+ }
+
+ /**
+ * The topics the operator consumes
+ */
+ public String[] getTopics()
+ {
+ return topics;
+ }
+
+ public void setStrategy(String policy)
+ {
+ this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
+ }
+
+ public String getStrategy()
+ {
+ return strategy.name();
+ }
+
+ public void setInitialOffset(String initialOffset)
+ {
+ this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase());
+ }
+
+ /**
+ * Initial offset, it should be one of the following
+ * <ul>
+ * <li>earliest</li>
+ * <li>latest</li>
+ * <li>application_or_earliest</li>
+ * <li>application_or_latest</li>
+ * </ul>
+ */
+ public String getInitialOffset()
+ {
+ return initialOffset.name();
+ }
+
+ public String getApplicationName()
+ {
+ return applicationName;
+ }
+
+ public void setConsumerProps(Properties consumerProps)
+ {
+ this.consumerProps = consumerProps;
+ }
+
+ /**
+ * Extra kafka consumer properties
+ * http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+ *
+ * Please be aware that the properties below are set by the operator, don't
override it
+ *
+ * <ul>
+ * <li>bootstrap.servers</li>
+ * <li>group.id</li>
+ * <li>auto.offset.reset</li>
+ * <li>enable.auto.commit</li>
+ * <li>partition.assignment.strategy</li>
+ * <li>key.deserializer</li>
+ * <li>value.deserializer</li>
+ * </ul>
+ *
+ */
+ public Properties getConsumerProps()
+ {
+ return consumerProps;
+ }
+
+ public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
+ {
+ this.maxTuplesPerWindow = maxTuplesPerWindow;
+ }
+
+ /**
+ * maximum tuples allowed to be emitted in each window
+ */
+ public int getMaxTuplesPerWindow()
+ {
+ return maxTuplesPerWindow;
+ }
+
+ /**
+ * @see <a
href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">
+ * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
+ */
+ public long getConsumerTimeout()
+ {
+ return consumerTimeout;
+ }
+
+ public void setConsumerTimeout(long consumerTimeout)
+ {
+ this.consumerTimeout = consumerTimeout;
+ }
+
+ /**
+ * Number of messages kept in memory waiting for emission to downstream
operator
+ */
+ public int getHoldingBufferSize()
+ {
+ return holdingBufferSize;
+ }
+
+ public void setHoldingBufferSize(int holdingBufferSize)
+ {
+ this.holdingBufferSize = holdingBufferSize;
+ }
+
+ /**
+ * metrics refresh interval
+ */
+ public long getMetricsRefreshInterval()
+ {
+ return metricsRefreshInterval;
+ }
+
+ public void setMetricsRefreshInterval(long metricsRefreshInterval)
+ {
+ this.metricsRefreshInterval = metricsRefreshInterval;
+ }
+
+ public void setRepartitionCheckInterval(long repartitionCheckInterval)
+ {
+ this.repartitionCheckInterval = repartitionCheckInterval;
+ }
+
+ /**
+ * Minimal interval between checking collected stats and decide whether it
needs to repartition or not.
+ * And minimal interval between 2 offset updates
+ */
+ public long getRepartitionCheckInterval()
+ {
+ return repartitionCheckInterval;
+ }
+
+ public void setRepartitionInterval(long repartitionInterval)
+ {
+ this.repartitionInterval = repartitionInterval;
+ }
+
+ /**
+ * Minimal interval between 2 (re)partition actions
+ */
+ public long getRepartitionInterval()
+ {
+ return repartitionInterval;
+ }
+
+ /**
+ * @omitFromUI
+ * @return current checkpointed offsets
+ */
+ public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
+ {
+ return offsetTrack;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
new file mode 100644
index 0000000..0fdd721
--- /dev/null
+++
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Joiner;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+
+/**
+ * Abstract partitioner used to manage the partitions of kafka input operator.
+ * It use a number of kafka consumers(one for each cluster) to get the latest
partition metadata for topics that
+ * the consumer subscribes and expose those to subclass which implements the
assign method
+ *
+ * The partitioner is always stateless.
+ */
+public abstract class AbstractKafkaPartitioner implements
Partitioner<AbstractKafkaInputOperator>, StatsListener
+{
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractKafkaPartitioner.class);
+
+ private static final String META_CONSUMER_GROUP_NAME =
AbstractKafkaInputOperator.class.getName() + "META_GROUP";
+
+ protected final String[] clusters;
+
+ protected final String[] topics;
+
+ protected final AbstractKafkaInputOperator prototypeOperator;
+
+ private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients;
+
+
+ private final List<Set<AbstractKafkaPartitioner.PartitionMeta>>
currentPartitions = new LinkedList<>(); // prevent null
+
+ public AbstractKafkaPartitioner(String[] clusters, String[] topics,
AbstractKafkaInputOperator prototypeOperator)
+ {
+ this.clusters = clusters;
+ this.topics = topics;
+ this.prototypeOperator = prototypeOperator;
+ }
+
+ abstract List<Set<PartitionMeta>> assign(Map<String,
Map<String,List<PartitionInfo>>> metadata);
+
+
+
+ @Override
+ public Collection<Partition<AbstractKafkaInputOperator>>
definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection,
PartitioningContext partitioningContext)
+ {
+
+ initMetadataClients();
+
+ Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
+
+
+ for (int i = 0; i < clusters.length; i++) {
+ metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+ for (String topic : topics) {
+ List<PartitionInfo> ptis =
metadataRefreshClients.get(i).partitionsFor(topic);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partition metadata for topic {} : {}", topic,
Joiner.on(';').join(ptis));
+ }
+ metadata.get(clusters[i]).put(topic, ptis);
+ }
+ metadataRefreshClients.get(i).close();
+ }
+
+ metadataRefreshClients = null;
+
+ List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = assign(metadata);
+
+
+ if (currentPartitions == parts || currentPartitions.equals(parts)) {
+ logger.debug("No partition change found");
+ return collection;
+ } else {
+ logger.info("Partition change detected: ");
+ currentPartitions.clear();
+ currentPartitions.addAll(parts);
+ int i = 0;
+ List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>();
+ for (Iterator<Partition<AbstractKafkaInputOperator>> iter =
collection.iterator(); iter.hasNext();) {
+ Partition<AbstractKafkaInputOperator> nextPartition = iter.next();
+ if (parts.remove(nextPartition.getPartitionedInstance().assignment()))
{
+ if (logger.isInfoEnabled()) {
+ logger.info("[Existing] Partition {} with assignment {} ", i,
+
Joiner.on(';').join(nextPartition.getPartitionedInstance().assignment()));
+ }
+ result.add(nextPartition);
+ i++;
+ }
+ }
+
+ for (Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment :
parts) {
+ if (logger.isInfoEnabled()) {
+ logger.info("[New] Partition {} with assignment {} ", i,
+ Joiner.on(';').join(partitionAssignment));
+ }
+ result.add(createPartition(partitionAssignment));
+ i++;
+ }
+
+ return result;
+ }
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>>
map)
+ {
+
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats batchedOperatorStats)
+ {
+ Response response = new Response();
+ response.repartitionRequired = true;
+ return response;
+ }
+
+ protected Partitioner.Partition<AbstractKafkaInputOperator>
createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
+ {
+ Kryo kryo = new Kryo();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Output output = new Output(bos);
+ kryo.writeObject(output, prototypeOperator);
+ output.close();
+ Input lInput = new Input(bos.toByteArray());
+ Partitioner.Partition<AbstractKafkaInputOperator> p =
(Partitioner.Partition<AbstractKafkaInputOperator>)
+ new DefaultPartition<>(kryo.readObject(lInput,
prototypeOperator.getClass()));
+ p.getPartitionedInstance().assign(partitionAssignment);
+ return p;
+ }
+ /**
+ *
+ */
+ private void initMetadataClients()
+ {
+ if (metadataRefreshClients != null && metadataRefreshClients.size() ==
clusters.length) {
+ // The metadata client is active
+ return;
+ }
+
+ if (clusters == null || clusters.length == 0) {
+ throw new IllegalStateException("clusters can not be null");
+ }
+
+ metadataRefreshClients = new ArrayList<>(clusters.length);
+ int index = 0;
+ for (String c : clusters) {
+ Properties prop = new Properties();
+ prop.put("group.id", META_CONSUMER_GROUP_NAME);
+ prop.put("bootstrap.servers", c);
+ prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
+ prop.put("enable.auto.commit", "false");
+ metadataRefreshClients.add(index++, new KafkaConsumer<byte[],
byte[]>(prop));
+ }
+ }
+
+ /**
+ * The key object used in the assignment map for each operator
+ */
+ public static class PartitionMeta
+ {
+
+ public PartitionMeta()
+ {
+ }
+
+ public PartitionMeta(String cluster, String topic, int partitionId)
+ {
+ this.cluster = cluster;
+ this.topic = topic;
+ this.partitionId = partitionId;
+ this.topicPartition = new TopicPartition(topic, partitionId);
+ }
+
+ private String cluster;
+
+ private transient TopicPartition topicPartition;
+
+ private String topic;
+
+ private int partitionId;
+
+ public String getCluster()
+ {
+ return cluster;
+ }
+
+ public int getPartitionId()
+ {
+ return partitionId;
+ }
+
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ public TopicPartition getTopicPartition()
+ {
+ if (topicPartition == null) {
+ topicPartition = new TopicPartition(topic, partitionId);
+ }
+ return topicPartition;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionMeta that = (PartitionMeta)o;
+ return Objects.equals(cluster, that.cluster) &&
+ Objects.equals(topicPartition, that.topicPartition);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(cluster, topicPartition);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PartitionMeta{" +
+ "cluster='" + cluster + '\'' +
+ ", topicPartition=" + topicPartition +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
new file mode 100644
index 0000000..0903570
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is the wrapper class for new Kafka consumer API
+ *
+ * It starts number of consumers(one for each cluster) in same number of
threads.
+ * Maintains the consumer offsets
+ *
+ * It also use the consumers to commit the application processed offsets along
with the application name
+ *
+ */
+public class KafkaConsumerWrapper implements Closeable
+{
+
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+
+ private boolean isAlive = false;
+
+ private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new
HashMap<>();
+
+ // The in memory buffer hold consumed messages
+ private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>>
holdingBuffer;
+
+ private AbstractKafkaInputOperator ownerOperator = null;
+
+ private ExecutorService kafkaConsumerExecutor;
+
+ private final Map<String, Map<TopicPartition, OffsetAndMetadata>>
offsetsToCommit = new HashMap<>();
+
+ /**
+ *
+ * Only put the offset needs to be committed in the
ConsumerThread.offsetToCommit map
+ * The consumer thread will commit the offset(s)
+ *
+ * @param offsetsInWindow
+ */
+ public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long>
offsetsInWindow)
+ {
+ if (offsetsInWindow == null) {
+ return;
+ }
+
+ // group offsets by cluster and topic partition
+ for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e :
offsetsInWindow.entrySet()) {
+ String cluster = e.getKey().getCluster();
+ Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMap =
offsetsToCommit.get(cluster);
+ if (topicPartitionOffsetMap == null) {
+ logger.warn("committed offset map should be initialized by consumer
thread!");
+ continue;
+ }
+ topicPartitionOffsetMap.put(e.getKey().getTopicPartition(), new
OffsetAndMetadata(e.getValue()));
+ }
+
+ }
+
+
+ static final class ConsumerThread implements Runnable
+ {
+
+ private final KafkaConsumer<byte[], byte[]> consumer;
+
+ private final String cluster;
+
+ private final KafkaConsumerWrapper wrapper;
+
+ private Map<TopicPartition, OffsetAndMetadata> offsetToCommit = null;
+
+ public ConsumerThread(String cluster, KafkaConsumer<byte[], byte[]>
consumer, KafkaConsumerWrapper wrapper)
+ {
+ this.cluster = cluster;
+ this.consumer = consumer;
+ this.wrapper = wrapper;
+ this.offsetToCommit = new ConcurrentHashMap<>();
+ wrapper.offsetsToCommit.put(cluster, offsetToCommit);
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+
+
+ while (wrapper.isAlive) {
+ if (!this.offsetToCommit.isEmpty()) {
+ // in each fetch cycle commit the offset if needed
+ if (logger.isDebugEnabled()) {
+ logger.debug("Commit offsets {}",
Joiner.on(';').withKeyValueSeparator("=").join(this.offsetToCommit));
+ }
+ consumer.commitAsync(offsetToCommit, wrapper.ownerOperator);
+ offsetToCommit.clear();
+ }
+ try {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(wrapper.ownerOperator.getConsumerTimeout());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ wrapper.putMessage(Pair.of(cluster, record));
+ }
+ } catch (NoOffsetForPartitionException e) {
+ // if initialOffset is set to EARLIST or LATEST
+ // and the application is run as first time
+ // then there is no existing committed offset and this error will
be caught
+ // we need to seek to either beginning or end of the partition
+ // based on the initial offset setting
+ AbstractKafkaInputOperator.InitialOffset io =
+
AbstractKafkaInputOperator.InitialOffset.valueOf(wrapper.ownerOperator.getInitialOffset());
+ if (io ==
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
+ || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
+ consumer.seekToBeginning(e.partitions().toArray(new
TopicPartition[0]));
+ } else {
+ consumer.seekToEnd(e.partitions().toArray(new
TopicPartition[0]));
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Consumer thread is interrupted
unexpectedly", e);
+ }
+ }
+ } catch (WakeupException we) {
+ logger.info("The consumer is being stopped");
+ } finally {
+ consumer.close();
+ }
+ }
+ }
+
+
+ /**
+ * This method is called in setup method of Abstract Kafka Input Operator
+ */
+ public void create(AbstractKafkaInputOperator ownerOperator)
+ {
+ holdingBuffer = new
ArrayBlockingQueue<>(ownerOperator.getHoldingBufferSize());
+ this.ownerOperator = ownerOperator;
+ logger.info("Create consumer wrapper with holding buffer size: {} ",
ownerOperator.getHoldingBufferSize());
+ if (logger.isInfoEnabled()) {
+ logger.info("Assignments are {} ",
Joiner.on('\n').join(ownerOperator.assignment()));
+ }
+ }
+
+
+ /**
+ * This method is called in the activate method of the operator
+ */
+ public void start()
+ {
+ isAlive = true;
+
+
+ // thread to consume the kafka data
+ // create thread pool for consumer threads
+ kafkaConsumerExecutor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build());
+
+ // group list of PartitionMeta by cluster
+ Map<String, List<TopicPartition>> consumerAssignment = new HashMap<>();
+ Set<AbstractKafkaPartitioner.PartitionMeta> assignments =
ownerOperator.assignment();
+ for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : assignments) {
+ String cluster = partitionMeta.getCluster();
+ List<TopicPartition> cAssignment = consumerAssignment.get(cluster);
+ if (cAssignment == null) {
+ cAssignment = new LinkedList<>();
+ consumerAssignment.put(cluster, cAssignment);
+ }
+ cAssignment.add(new TopicPartition(partitionMeta.getTopic(),
partitionMeta.getPartitionId()));
+ }
+
+ Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset =
ownerOperator.getOffsetTrack();
+
+
+ // create one thread for each cluster
+ // each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+
topic(s)
+ for (Map.Entry<String, List<TopicPartition>> e :
consumerAssignment.entrySet()) {
+
+ Properties prop = new Properties();
+ if (ownerOperator.getConsumerProps() != null) {
+ prop.putAll(ownerOperator.getConsumerProps());
+ }
+
+ prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, e.getKey());
+ prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ // never auto commit the offsets
+ prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ AbstractKafkaInputOperator.InitialOffset initialOffset =
+
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
+
+ if (initialOffset ==
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST ||
+ initialOffset ==
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) {
+ // commit the offset with application name if we set initialoffset to
application
+ prop.put(ConsumerConfig.GROUP_ID_CONFIG,
ownerOperator.getApplicationName() + "_Consumer");
+ }
+
+ KafkaConsumer<byte[], byte[]> kc = new KafkaConsumer<>(prop);
+ kc.assign(e.getValue());
+ if (logger.isInfoEnabled()) {
+ logger.info("Create consumer with properties {} ",
Joiner.on(";").withKeyValueSeparator("=").join(prop));
+ logger.info("Assign consumer to {}",
Joiner.on('#').join(e.getValue()));
+ }
+ if (currentOffset != null && !currentOffset.isEmpty()) {
+ for (TopicPartition tp : e.getValue()) {
+ AbstractKafkaPartitioner.PartitionMeta partitionKey =
+ new AbstractKafkaPartitioner.PartitionMeta(e.getKey(),
tp.topic(), tp.partition());
+ if (currentOffset.containsKey(partitionKey)) {
+ kc.seek(tp, currentOffset.get(partitionKey));
+ }
+ }
+ }
+
+ consumers.put(e.getKey(), kc);
+ kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this));
+ }
+
+
+ }
+
+ /**
+ * The method is called in the deactivate method of the operator
+ */
+ public void stop()
+ {
+ for (KafkaConsumer<byte[], byte[]> c : consumers.values()) {
+ c.wakeup();
+ }
+ kafkaConsumerExecutor.shutdownNow();
+ isAlive = false;
+ holdingBuffer.clear();
+ IOUtils.closeQuietly(this);
+ }
+
+ /**
+ * This method is called in teardown method of the operator
+ */
+ public void teardown()
+ {
+ holdingBuffer.clear();
+ }
+
+ public boolean isAlive()
+ {
+ return isAlive;
+ }
+
+ public void setAlive(boolean isAlive)
+ {
+ this.isAlive = isAlive;
+ }
+
+ public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage()
+ {
+ return holdingBuffer.poll();
+ }
+
+ public int messageSize()
+ {
+ return holdingBuffer.size();
+ }
+
+ protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>>
msg) throws InterruptedException
+ {
+ // block from receiving more message
+ holdingBuffer.put(msg);
+ }
+
+
+ @Override
+ public void close() throws IOException
+ {
+ }
+
+ public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics()
+ {
+ Map<String, Map<MetricName, ? extends Metric>> val = new HashMap<>();
+ for (Map.Entry<String, KafkaConsumer<byte[], byte[]>> e :
consumers.entrySet()) {
+ val.put(e.getKey(), e.getValue().metrics());
+ }
+ return val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
new file mode 100644
index 0000000..fdb1252
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * Metrics class
+ */
+public class KafkaMetrics implements Serializable
+{
+ private KafkaConsumerStats[] stats;
+
+ private transient long lastMetricSampleTime = 0L;
+
+ private transient long metricsRefreshInterval;
+
+ public KafkaMetrics(long metricsRefreshInterval)
+ {
+ this.metricsRefreshInterval = metricsRefreshInterval;
+ }
+
+ void updateMetrics(String[] clusters, Map<String, Map<MetricName, ? extends
Metric>> metricsMap)
+ {
+ long current = System.currentTimeMillis();
+ if (current - lastMetricSampleTime < metricsRefreshInterval) {
+ return;
+ }
+
+ lastMetricSampleTime = current;
+
+ if (stats == null) {
+ stats = new KafkaConsumerStats[clusters.length];
+ }
+
+ for (int i = 0; i < clusters.length; i++) {
+ if (stats[i] == null) {
+ stats[i] = new KafkaConsumerStats();
+ stats[i].cluster = clusters[i];
+ }
+ Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters[i]);
+ if (cMetrics == null || cMetrics.isEmpty()) {
+ stats[i].bytesPerSec = 0;
+ stats[i].msgsPerSec = 0;
+ continue;
+ }
+ if (stats[i].bytePerSecMK == null || stats[i].msgPerSecMK == null) {
+ for (MetricName mn : cMetrics.keySet()) {
+ if (mn.name().equals("bytes-consumed-rate")) {
+ stats[i].bytePerSecMK = mn;
+ } else if (mn.name().equals("records-consumed-rate")) {
+ stats[i].msgPerSecMK = mn;
+ }
+ }
+ }
+ stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
+ stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();
+ }
+ }
+
+ public KafkaConsumerStats[] getStats()
+ {
+ return stats;
+ }
+
+ /**
+ * Counter class which gives the statistic value from the consumer
+ */
+ public static class KafkaConsumerStats implements Serializable
+ {
+ private static final long serialVersionUID = -2867402654990209006L;
+
+ public transient MetricName msgPerSecMK;
+ public transient MetricName bytePerSecMK;
+
+ public String cluster;
+ /**
+ * Metrics for each consumer
+ */
+ public double msgsPerSec;
+
+ public double bytesPerSec;
+
+ public KafkaConsumerStats()
+ {
+ }
+ }
+
+ public static class KafkaMetricsAggregator implements AutoMetric.Aggregator,
Serializable
+ {
+
+ @Override
+ public Map<String, Object> aggregate(long l,
Collection<AutoMetric.PhysicalMetricsContext> collection)
+ {
+ double totalBytesPerSec = 0;
+ double totalMsgsPerSec = 0;
+ Map<String, Object> total = new HashMap<>();
+ for (AutoMetric.PhysicalMetricsContext pmc : collection) {
+ KafkaMetrics km = (KafkaMetrics)pmc.getMetrics().get("metrics");
+ for (KafkaConsumerStats kcs : km.stats) {
+ totalBytesPerSec += kcs.bytesPerSec;
+ totalMsgsPerSec += kcs.msgsPerSec;
+ }
+ }
+ total.put("totalBytesPerSec", totalBytesPerSec);
+ total.put("totalMsgPerSec", totalMsgsPerSec);
+ return total;
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
new file mode 100644
index 0000000..4a4ebf3
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+
+import java.io.Serializable;
+
+/**
+ * @since 2.1.0
+ */
+public class KafkaPartition implements Serializable
+{
+ protected static final String DEFAULT_CLUSTERID =
"com.datatorrent.contrib.kafka.defaultcluster";
+
+ @SuppressWarnings("unused")
+ private KafkaPartition()
+ {
+ }
+
+ public KafkaPartition(String topic, int partitionId)
+ {
+ this(DEFAULT_CLUSTERID, topic, partitionId);
+ }
+
+ public KafkaPartition(String clusterId, String topic, int partitionId)
+ {
+ super();
+ this.clusterId = clusterId;
+ this.partitionId = partitionId;
+ this.topic = topic;
+ }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 7556802229202221546L;
+
+
+ private String clusterId;
+
+ private int partitionId;
+
+ private String topic;
+
+ public String getClusterId()
+ {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId)
+ {
+ this.clusterId = clusterId;
+ }
+
+ public int getPartitionId()
+ {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId)
+ {
+ this.partitionId = partitionId;
+ }
+
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ public void setTopic(String topic)
+ {
+ this.topic = topic;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
+ result = prime * result + partitionId;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ KafkaPartition other = (KafkaPartition)obj;
+ if (clusterId == null) {
+ if (other.clusterId != null) {
+ return false;
+ }
+ } else if (!clusterId.equals(other.clusterId)) {
+ return false;
+ }
+ if (partitionId != other.partitionId) {
+ return false;
+ }
+ if (topic == null) {
+ if (other.topic != null) {
+ return false;
+ }
+ } else if (!topic.equals(other.topic)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" +
partitionId + ", topic=" + topic + "]";
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
new file mode 100644
index 0000000..e563c42
--- /dev/null
+++
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * This is just an example of single port operator emits only byte array
messages
+ * The key and cluster information are ignored
+ * This class emit the value to the single output port
+ */
+public class KafkaSinglePortInputOperator extends AbstractKafkaInputOperator
+{
+
+ /**
+ * This output port emits tuples extracted from Kafka messages.
+ */
+ public final transient DefaultOutputPort<byte[]> outputPort = new
DefaultOutputPort<>();
+
+ @Override
+ protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message)
+ {
+ outputPort.emit(message.value());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
new file mode 100644
index 0000000..09d22eb
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * A one-to-many partitioner implementation that creates fix number of
operator partitions and assign one or more
+ * Kafka partitions to each. It use round robin to assign partitions
+ */
+public class OneToManyPartitioner extends AbstractKafkaPartitioner
+{
+
+ public OneToManyPartitioner(String[] clusters, String[] topics,
AbstractKafkaInputOperator protoTypeOperator)
+ {
+ super(clusters, topics, protoTypeOperator);
+ }
+
+ @Override
+ List<Set<PartitionMeta>> assign(Map<String, Map<String,
List<PartitionInfo>>> metadata)
+ {
+ if (prototypeOperator.getInitialPartitionCount() <= 0) {
+ throw new IllegalArgumentException("Num of partitions should be greater
or equal to 1");
+ }
+
+ int partitionCount = prototypeOperator.getInitialPartitionCount();
+ ArrayList<Set<PartitionMeta>> eachPartitionAssignment = new
ArrayList<>(prototypeOperator.getInitialPartitionCount());
+ int i = 0;
+ for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap :
metadata.entrySet()) {
+ for (Map.Entry<String, List<PartitionInfo>> topicPartition :
clusterMap.getValue().entrySet()) {
+ for (PartitionInfo pif : topicPartition.getValue()) {
+ int index = i++ % partitionCount;
+ if (eachPartitionAssignment.get(index) == null) {
+ eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+ }
+ eachPartitionAssignment.get(index).add(new
PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
+ }
+ }
+ }
+
+ return eachPartitionAssignment;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
new file mode 100644
index 0000000..0cf7d44
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.PartitionInfo;
+
+import com.google.common.collect.Sets;
+
+/**
+ * An one-to-one partitioner implementation that always returns same amount of
operator partitions as
+ * Kafka partitions for the topics that operator subscribe
+ */
+public class OneToOnePartitioner extends AbstractKafkaPartitioner
+{
+
+ public OneToOnePartitioner(String[] clusters, String[] topics,
AbstractKafkaInputOperator prototypeOperator)
+ {
+ super(clusters, topics, prototypeOperator);
+ }
+
+ @Override
+ List<Set<PartitionMeta>> assign(Map<String, Map<String,
List<PartitionInfo>>> metadata)
+ {
+ List<Set<PartitionMeta>> currentAssignment = new LinkedList<>();
+ for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap :
metadata.entrySet()) {
+ for (Map.Entry<String, List<PartitionInfo>> topicPartition :
clusterMap.getValue().entrySet()) {
+ for (PartitionInfo pif : topicPartition.getValue()) {
+ currentAssignment.add(Sets.newHashSet(new
PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())));
+ }
+ }
+ }
+ return currentAssignment;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
----------------------------------------------------------------------
diff --git
a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
new file mode 100644
index 0000000..cb21f3d
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+public enum PartitionStrategy
+{
+ /**
+ * Each operator partition connect to only one kafka partition
+ */
+ ONE_TO_ONE,
+ /**
+ * Each operator consumes from several kafka partitions with overall input
rate under some certain hard limit in msgs/s or bytes/s
+ * For now it <b>only</b> support <b>simple kafka consumer</b>
+ */
+ ONE_TO_MANY,
+ /**
+ * 1 to N partition based on the heuristic function
+ * <b>NOT</b> implemented yet
+ * TODO implement this later
+ */
+ ONE_TO_MANY_HEURISTIC
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
new file mode 100644
index 0000000..17bc465
--- /dev/null
+++
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * A bunch of test to verify the input operator will be automatically
partitioned per kafka partition This test is launching its
+ * own Kafka cluster.
+ */
+@RunWith(Parameterized.class)
+public class KafkaInputOperatorTest extends KafkaOperatorTestBase
+{
+
+ private int totalBrokers = 0;
+
+
+
+ @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
+ public static Collection<Boolean[]> testScenario()
+ {
+ return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with
single partition
+ {true, true}, // multi cluster with multi partitions
+ {false, true}, // single cluster with multi partitions
+ {false, false}, // single cluster with single partitions
+ });
+ }
+
+ public KafkaInputOperatorTest(boolean hasMultiCluster, boolean
hasMultiPartition)
+ {
+ // This class want to initialize several kafka brokers for multiple
partitions
+ this.hasMultiCluster = hasMultiCluster;
+ this.hasMultiPartition = hasMultiPartition;
+ int cluster = 1 + (hasMultiCluster ? 1 : 0);
+ totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+
+ }
+
+ private static final org.slf4j.Logger logger =
LoggerFactory.getLogger(KafkaInputOperatorTest.class);
+ private static List<String> tupleCollection = new LinkedList<>();
+ private static CountDownLatch latch;
+ private static boolean hasFailure = false;
+ private static int failureTrigger = 3000;
+ private static int k = 0;
+
+ /**
+ * Test Operator to collect tuples from KafkaSingleInputStringOperator.
+ *
+ * @param
+ */
+ public static class CollectorModule extends BaseOperator
+ {
+ public final transient CollectorInputPort inputPort = new
CollectorInputPort();
+ }
+
+ public static class CollectorInputPort extends DefaultInputPort<byte[]>
+ {
+
+ @Override
+ public void process(byte[] bt)
+ {
+ String tuple = new String(bt);
+ if (hasFailure && k++ == failureTrigger) {
+ //you can only kill yourself once
+ hasFailure = false;
+ throw new RuntimeException();
+ }
+ if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) {
+ if (latch != null) {
+ latch.countDown();
+ }
+ return;
+ }
+ tupleCollection.add(tuple);
+ }
+
+ @Override
+ public void setConnected(boolean flag)
+ {
+ if (flag) {
+ tupleCollection.clear();
+ }
+ }
+ }
+
+ /**
+ * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for
Kafka, aka consumer). This module receives
+ * data from an outside test generator through Kafka message bus and feed
that data into Malhar streaming platform.
+ *
+ * [Generate message and send that to Kafka message bus] ==> [Receive that
message through Kafka input adapter(i.e.
+ * consumer) and send using emitTuples() interface on output port]
+ *
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPartitionableInputOperator() throws Exception
+ {
+ hasFailure = false;
+ testInputOperator(false);
+ }
+
+
+ @Test
+ public void testPartitionableInputOperatorWithFailure() throws Exception
+ {
+ hasFailure = true;
+ testInputOperator(true);
+ }
+
+ public void testInputOperator(boolean hasFailure) throws Exception
+ {
+
+ // each broker should get a END_TUPLE message
+ latch = new CountDownLatch(totalBrokers);
+
+ int totalCount = 10000;
+
+ // Start producer
+ KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, hasMultiPartition,
hasMultiCluster);
+ p.setSendCount(totalCount);
+ Thread t = new Thread(p);
+ t.start();
+
+ // Create DAG for testing.
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ // Create KafkaSinglePortStringInputOperator
+ KafkaSinglePortInputOperator node = dag.addOperator("Kafka input",
KafkaSinglePortInputOperator.class);
+ node.setInitialPartitionCount(1);
+ // set topic
+ node.setTopics(TEST_TOPIC);
+
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ node.setClusters(getClusterConfig());
+
+ // Create Test tuple collector
+ CollectorModule collector = dag.addOperator("TestMessageCollector", new
CollectorModule());
+
+ // Connect ports
+ dag.addStream("Kafka message", node.outputPort,
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ if (hasFailure) {
+ setupHasFailureTest(node, dag);
+ }
+ lc.runAsync();
+
+ // Wait 30s for consumer finish consuming all the messages
+ boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout);
+
+ // Check results
+ Assert.assertEquals("Tuple count", totalCount, tupleCollection.size());
+ logger.debug(String.format("Number of emitted tuples: %d",
tupleCollection.size()));
+
+ t.join();
+ p.close();
+ lc.shutdown();
+ }
+
+ private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG
dag)
+ {
+ operator.setHoldingBufferSize(5000);
+ dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+ //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new
FSStorageAgent("target/ck", new Configuration()));
+ operator.setMaxTuplesPerWindow(500);
+ }
+
+ private String getClusterConfig() {
+ String l = "localhost:";
+ return l + TEST_KAFKA_BROKER_PORT[0][0] +
+ (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
+ (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
+ (hasMultiCluster && hasMultiPartition ? "," + l +
TEST_KAFKA_BROKER_PORT[1][1] : "");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
new file mode 100644
index 0000000..7085348
--- /dev/null
+++
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import kafka.admin.TopicCommand;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZkUtils;
+
+/**
+ * This is a base class setup/clean Kafka testing environment for all the
input/output test If it's a multipartition
+ * test, this class creates 2 kafka partitions
+ */
+public class KafkaOperatorTestBase
+{
+
+ public static final String END_TUPLE = "END_TUPLE";
+ public static final int[] TEST_ZOOKEEPER_PORT;
+ public static final int[][] TEST_KAFKA_BROKER_PORT;
+ public static final String TEST_TOPIC = "testtopic";
+
+ // get available ports
+ static {
+ ServerSocket[] listeners = new ServerSocket[6];
+ int[] p = new int[6];
+
+ try {
+ for (int i = 0; i < 6; i++) {
+ listeners[i] = new ServerSocket(0);
+ p[i] = listeners[i].getLocalPort();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ listeners[i].close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
+ TEST_KAFKA_BROKER_PORT = new int[][] {
+ new int[] {p[2], p[3]},
+ new int[] {p[4], p[5]}
+ };
+ }
+
+ static final org.slf4j.Logger logger =
LoggerFactory.getLogger(KafkaOperatorTestBase.class);
+ // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
+
+ // multiple brokers in multiple cluster
+ private KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
+
+ // multiple cluster
+ private ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
+
+ private ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
+
+ public String baseDir = "target";
+
+ private final String zkBaseDir = "zookeeper-server-data";
+ private final String kafkaBaseDir = "kafka-server-data";
+ private final String[] zkdir = new String[] { "zookeeper-server-data/1",
"zookeeper-server-data/2" };
+ private final String[][] kafkadir = new String[][] { new String[] {
"kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] {
"kafka-server-data/2/1", "kafka-server-data/2/2" } };
+ protected boolean hasMultiPartition = false;
+ protected boolean hasMultiCluster = false;
+
+ public void startZookeeper(final int clusterId)
+ {
+ try {
+
+ int numConnections = 10;
+ int tickTime = 2000;
+ File dir = new File(baseDir, zkdir[clusterId]);
+
+ zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
+ zkFactory[clusterId] = new NIOServerCnxnFactory();
+ zkFactory[clusterId].configure(new
InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
+
+ zkFactory[clusterId].startup(zkServer[clusterId]); // start the
zookeeper server.
+ Thread.sleep(2000);
+ //kserver.startup();
+ } catch (Exception ex) {
+ logger.error(ex.getLocalizedMessage());
+ }
+ }
+
+ public void stopZookeeper()
+ {
+ for (ZooKeeperServer zs : zkServer) {
+ if (zs != null) {
+ zs.shutdown();
+ }
+ }
+
+ for (ServerCnxnFactory zkf : zkFactory) {
+ if (zkf != null) {
+ zkf.closeAll();
+ zkf.shutdown();
+ }
+ }
+ zkServer = new ZooKeeperServer[2];
+ zkFactory = new ServerCnxnFactory[2];
+ }
+
+ public void startKafkaServer(int clusterid, int brokerid)
+ {
+ Properties props = new Properties();
+ props.setProperty("broker.id", "" + brokerid);
+ props.setProperty("log.dirs", new File(baseDir,
kafkadir[clusterid][brokerid]).toString());
+ props.setProperty("zookeeper.connect", "localhost:" +
TEST_ZOOKEEPER_PORT[clusterid]);
+ props.setProperty("port", "" +
TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
+ props.setProperty("default.replication.factor", "1");
+ // set this to 50000 to boost the performance so most test data are in
memory before flush to disk
+ props.setProperty("log.flush.interval.messages", "50000");
+ if (hasMultiPartition) {
+ props.setProperty("num.partitions", "2");
+ } else {
+ props.setProperty("num.partitions", "1");
+ }
+
+ broker[clusterid][brokerid] = new KafkaServerStartable(new
KafkaConfig(props));
+ broker[clusterid][brokerid].startup();
+
+ }
+
+ public void startKafkaServer()
+ {
+
+ FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
+ boolean[][] startable = new boolean[][] { new boolean[] { true,
hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster &&
hasMultiPartition } };
+ for (int i = 0; i < startable.length; i++) {
+ for (int j = 0; j < startable[i].length; j++) {
+ if (startable[i][j])
+ startKafkaServer(i, j);
+ }
+ }
+
+ // startup is asynch operation. wait 2 sec for server to startup
+
+ }
+
+ public void stopKafkaServer()
+ {
+ for (int i = 0; i < broker.length; i++) {
+ for (int j = 0; j < broker[i].length; j++) {
+ if (broker[i][j] != null) {
+ broker[i][j].shutdown();
+ broker[i][j].awaitShutdown();
+ broker[i][j] = null;
+ }
+ }
+ }
+ }
+
+ @Before
+ public void beforeTest()
+ {
+ try {
+ startZookeeper();
+ startKafkaServer();
+ createTopic(0, TEST_TOPIC);
+ if (hasMultiCluster) {
+ createTopic(1, TEST_TOPIC);
+ }
+ } catch (java.nio.channels.CancelledKeyException ex) {
+ logger.debug("LSHIL {}", ex.getLocalizedMessage());
+ }
+ }
+
+ public void startZookeeper()
+ {
+ FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
+ startZookeeper(0);
+ if (hasMultiCluster) {
+ startZookeeper(1);
+ }
+ }
+
+ public void createTopic(int clusterid, String topicName)
+ {
+ String[] args = new String[9];
+ args[0] = "--zookeeper";
+ args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
+ args[2] = "--replication-factor";
+ args[3] = "1";
+ args[4] = "--partitions";
+ if (hasMultiPartition) {
+ args[5] = "2";
+ } else {
+ args[5] = "1";
+ }
+ args[6] = "--topic";
+ args[7] = topicName;
+ args[8] = "--create";
+
+ ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid],
30000, 30000, false);
+ TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
+
+ // Right now, there is no programmatic synchronized way to create the
topic. have to wait 2 sec to make sure the
+ // topic is created
+ // So the tests will not hit any bizarre failure
+ try {
+ Thread.sleep(5000);
+ zu.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @After
+ public void afterTest()
+ {
+ try {
+ stopKafkaServer();
+ stopZookeeper();
+ } catch (Exception ex) {
+ logger.debug("LSHIL {}", ex.getLocalizedMessage());
+ }
+ }
+
+ public void setHasMultiPartition(boolean hasMultiPartition)
+ {
+ this.hasMultiPartition = hasMultiPartition;
+ }
+
+ public void setHasMultiCluster(boolean hasMultiCluster)
+ {
+ this.hasMultiCluster = hasMultiCluster;
+ }
+
+ public static class TestZookeeperServer extends ZooKeeperServer
+ {
+
+ public TestZookeeperServer()
+ {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws
IOException
+ {
+ super(snapDir, logDir, tickTime);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder
treeBuilder) throws IOException
+ {
+ super(txnLogFactory, treeBuilder);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
DataTreeBuilder treeBuilder) throws IOException
+ {
+ super(txnLogFactory, tickTime, treeBuilder);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int
minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder,
ZKDatabase zkDb)
+ {
+ super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout,
treeBuilder, zkDb);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void registerJMX()
+ {
+ }
+
+ @Override
+ protected void unregisterJMX()
+ {
+ }
+
+ }
+}