APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the support of KafkaInputOperator using 0.10 consumer API
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b42d8e74 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b42d8e74 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b42d8e74 Branch: refs/heads/master Commit: b42d8e741cf2c8241749351b31e8fd7fef546916 Parents: 4df6858 Author: chaitanya <chai...@apache.org> Authored: Wed Apr 5 15:31:47 2017 +0530 Committer: chaitanya <chai...@apache.org> Committed: Sun Jul 2 23:14:06 2017 +0530 ---------------------------------------------------------------------- kafka/XmlJavadocCommentsExtractor.xsl | 48 -- .../XmlJavadocCommentsExtractor.xsl | 48 ++ kafka/kafka-common/pom.xml | 223 +++++++ .../malhar/kafka/AbstractKafkaConsumer.java | 127 ++++ .../kafka/AbstractKafkaInputOperator.java | 635 +++++++++++++++++++ .../kafka/AbstractKafkaOutputOperator.java | 126 ++++ .../malhar/kafka/AbstractKafkaPartitioner.java | 335 ++++++++++ .../apex/malhar/kafka/KafkaConsumerWrapper.java | 387 +++++++++++ .../apache/apex/malhar/kafka/KafkaMetrics.java | 140 ++++ .../apex/malhar/kafka/KafkaPartition.java | 142 +++++ .../kafka/KafkaSinglePortOutputOperator.java | 46 ++ .../apex/malhar/kafka/OneToManyPartitioner.java | 72 +++ .../apex/malhar/kafka/OneToOnePartitioner.java | 61 ++ .../apex/malhar/kafka/PartitionStrategy.java | 45 ++ .../AbstractKafkaConsumerPropertiesTest.java | 84 +++ .../src/test/resources/log4j.properties | 49 ++ kafka/kafka010/XmlJavadocCommentsExtractor.xsl | 48 ++ kafka/kafka010/pom.xml | 98 +++ .../apex/malhar/kafka/KafkaConsumer010.java | 200 ++++++ .../kafka/KafkaSinglePortInputOperator.java | 59 ++ .../kafka/KafkaConsumerPropertiesTest.java | 35 + .../malhar/kafka/KafkaInputOperatorTest.java | 397 ++++++++++++ .../malhar/kafka/KafkaOperatorTestBase.java | 285 +++++++++ .../apex/malhar/kafka/KafkaTestPartitioner.java | 64 ++ .../apex/malhar/kafka/KafkaTestProducer.java | 181 ++++++ .../src/test/resources/log4j.properties | 49 ++ kafka/kafka09/XmlJavadocCommentsExtractor.xsl | 48 ++ kafka/kafka09/pom.xml | 86 +++ .../apex/malhar/kafka/KafkaConsumer09.java | 200 ++++++ ...afkaSinglePortExactlyOnceOutputOperator.java | 413 ++++++++++++ .../kafka/KafkaSinglePortInputOperator.java | 60 ++ .../apache/apex/malhar/kafka/EmbeddedKafka.java | 166 +++++ .../kafka/KafkaConsumerPropertiesTest.java | 35 + .../apache/apex/malhar/kafka/KafkaHelper.java | 66 ++ .../malhar/kafka/KafkaInputOperatorTest.java | 397 ++++++++++++ .../malhar/kafka/KafkaOperatorTestBase.java | 285 +++++++++ .../malhar/kafka/KafkaOutputOperatorTest.java | 425 +++++++++++++ .../apex/malhar/kafka/KafkaTestPartitioner.java | 64 ++ .../apex/malhar/kafka/KafkaTestProducer.java | 181 ++++++ .../kafka09/src/test/resources/log4j.properties | 49 ++ kafka/pom.xml | 306 ++++----- .../kafka/AbstractKafkaInputOperator.java | 622 ------------------ .../kafka/AbstractKafkaOutputOperator.java | 126 ---- .../malhar/kafka/AbstractKafkaPartitioner.java | 335 ---------- .../apex/malhar/kafka/KafkaConsumerWrapper.java | 389 ------------ .../apache/apex/malhar/kafka/KafkaMetrics.java | 140 ---- .../apex/malhar/kafka/KafkaPartition.java | 142 ----- ...afkaSinglePortExactlyOnceOutputOperator.java | 413 ------------ .../kafka/KafkaSinglePortInputOperator.java | 46 -- .../kafka/KafkaSinglePortOutputOperator.java | 46 -- .../apex/malhar/kafka/OneToManyPartitioner.java | 72 --- .../apex/malhar/kafka/OneToOnePartitioner.java | 61 -- .../apex/malhar/kafka/PartitionStrategy.java | 45 -- .../apache/apex/malhar/kafka/EmbeddedKafka.java | 166 ----- .../kafka/KafkaConsumerPropertiesTest.java | 81 --- .../apache/apex/malhar/kafka/KafkaHelper.java | 66 -- .../malhar/kafka/KafkaInputOperatorTest.java | 399 ------------ .../malhar/kafka/KafkaOperatorTestBase.java | 292 --------- .../malhar/kafka/KafkaOutputOperatorTest.java | 429 ------------- .../apex/malhar/kafka/KafkaTestPartitioner.java | 64 -- .../apex/malhar/kafka/KafkaTestProducer.java | 179 ------ kafka/src/test/resources/log4j.properties | 50 -- sql/pom.xml | 6 - 63 files changed, 6549 insertions(+), 4385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/kafka/XmlJavadocCommentsExtractor.xsl b/kafka/XmlJavadocCommentsExtractor.xsl deleted file mode 100644 index ec72325..0000000 --- a/kafka/XmlJavadocCommentsExtractor.xsl +++ /dev/null @@ -1,48 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<!-- - Document : XmlJavadocCommentsExtractor.xsl - Created on : September 16, 2014, 11:30 AM - Description: - The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. ---> - -<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> - <xsl:output method="xml" standalone="yes"/> - - <!-- copy xml by selecting only the following nodes, attrbutes and text --> - <xsl:template match="node()|text()|@*"> - <xsl:copy> - <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> - </xsl:copy> - </xsl:template> - - <!-- Strip off the following paths from the selected xml --> - <xsl:template match="//root/package/interface/interface - |//root/package/interface/method/@qualified - |//root/package/class/interface - |//root/package/class/class - |//root/package/class/method/@qualified - |//root/package/class/field/@qualified" /> - - <xsl:strip-space elements="*"/> -</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl b/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl new file mode 100644 index 0000000..ec72325 --- /dev/null +++ b/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!-- + Document : XmlJavadocCommentsExtractor.xsl + Created on : September 16, 2014, 11:30 AM + Description: + The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. +--> + +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> + <xsl:output method="xml" standalone="yes"/> + + <!-- copy xml by selecting only the following nodes, attrbutes and text --> + <xsl:template match="node()|text()|@*"> + <xsl:copy> + <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> + </xsl:copy> + </xsl:template> + + <!-- Strip off the following paths from the selected xml --> + <xsl:template match="//root/package/interface/interface + |//root/package/interface/method/@qualified + |//root/package/class/interface + |//root/package/class/class + |//root/package/class/method/@qualified + |//root/package/class/field/@qualified" /> + + <xsl:strip-space elements="*"/> +</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/pom.xml ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/pom.xml b/kafka/kafka-common/pom.xml new file mode 100755 index 0000000..71a8a09 --- /dev/null +++ b/kafka/kafka-common/pom.xml @@ -0,0 +1,223 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-kafka-connectors</artifactId> + <version>3.8.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-kafka-common</artifactId> + <packaging>jar</packaging> + <name>Apache Apex Malhar Common Utilities for Kafka Support</name> + + <properties> + <checkstyle.console>false</checkstyle.console> + </properties> + + <build> + <plugins> + <!-- Publish tests jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + <!-- create resource directory for xml javadoc--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>createJavadocDirectory</id> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> + <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- generate javdoc --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <!-- generate xml javadoc --> + <execution> + <id>xml-doclet</id> + <phase>generate-resources</phase> + <goals> + <goal>javadoc</goal> + </goals> + <configuration> + <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> + <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> + <useStandardDocletOptions>false</useStandardDocletOptions> + <docletArtifact> + <groupId>com.github.markusbernhardt</groupId> + <artifactId>xml-doclet</artifactId> + <version>1.0.4</version> + </docletArtifact> + </configuration> + </execution> + <!-- generate default javadoc jar with custom tags --> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <skip>true</skip> + <tags> + <tag> + <name>customTag1</name> + <placement>a</placement> + <head>Custom Tag One:</head> + </tag> + <tag> + <name>customTag2</name> + <placement>a</placement> + <head>Custom Tag two:</head> + </tag> + <tag> + <name>customTag3</name> + <placement>a</placement> + <head>Custom Tag three:</head> + </tag> + </tags> + </configuration> + </execution> + </executions> + </plugin> + <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <id>transform-xmljavadoc</id> + <phase>generate-resources</phase> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformationSets> + <transformationSet> + <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> + <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> + </transformationSet> + </transformationSets> + </configuration> + </plugin> + <!-- copy xml javadoc to class jar --> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-resources</id> + <phase>process-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>-Xmx2048m</argLine> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.9.0.1</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.9.0.1</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java new file mode 100644 index 0000000..ebb46e5 --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; + +/** + * Interface for Kafka Consumer. It wraps around the KafkaConsumer. + */ +public interface AbstractKafkaConsumer +{ + /** + * Checks whether the consumer contains the specified partition or not + * @param topicPartition topic partition + * @return true if consumer contains the given partition, otherwise false + */ + boolean isConsumerContainsPartition(TopicPartition topicPartition); + + /** + * Seek to the specified offset for the given partition + * @param topicPartition topic partition + * @param offset given offset + */ + void seekToOffset(TopicPartition topicPartition, long offset); + + /** + * Fetch data for the topics or partitions specified using assign API. + * @param timeOut time in milliseconds, spent waiting in poll if data is not available in buffer. + * @return records + */ + ConsumerRecords<byte[], byte[]> pollRecords(long timeOut); + + /** + * Commit the specified offsets for the specified list of topics and partitions to Kafka. + * @param offsets given offsets + * @param callback Callback to invoke when the commit completes + */ + void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback); + + /** + * Assign the specified list of partitions to the consumer + * @param partitions list of partitions + */ + void assignPartitions(List<TopicPartition> partitions); + + /** + * Seek to the first offset for the specified list of partitions + * @param partitions list of partitions + */ + void seekToBeginning(TopicPartition... partitions); + + /** + * Seek to the last offser for the specified list of partitions + * @param partitions list of partitions + */ + void seekToEnd(TopicPartition... partitions); + + /** + * Wrapper for Wakeup the consumer + */ + void wakeup(); + + /** + * Return the metrics kept by the consumer + * @return metrics + */ + Map<MetricName, ? extends Metric> metrics(); + + /** + * Wrapper for close the consumer + */ + void close(); + + /** + * Resume all the partitions + */ + void resumeAllPartitions(); + + /** + * Return the list of partitions assigned to this consumer + * @return list of partitions + */ + Collection<TopicPartition> getPartitions(); + + /** + * Resume the specified partition + * @param tp partition + */ + void resumePartition(TopicPartition tp); + + /** + * Pause the specified partition + * @param tp partition + */ + void pausePartition(TopicPartition tp); + + /** + * Return the offset of the next record that will be fetched + * @param tp partition + */ + long positionPartition(TopicPartition tp); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java new file mode 100644 index 0000000..40747eb --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -0,0 +1,635 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StatsListener; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * The abstract kafka input operator using kafka 0.9.0 new consumer API + * A scalable, fault-tolerant, at-least-once kafka input operator + * Key features includes: + * + * <ol> + * <li>Out-of-box One-to-one and one-to-many partition strategy support plus customizable partition strategy + * refer to AbstractKafkaPartitioner </li> + * <li>Fault-tolerant when the input operator goes down, it redeploys on other node</li> + * <li>At-least-once semantics for operator failure (no matter which operator fails)</li> + * <li>At-least-once semantics for cold restart (no data loss even if you restart the application)</li> + * <li>Multi-cluster support, one operator can consume data from more than one kafka clusters</li> + * <li>Multi-topic support, one operator can subscribe multiple topics</li> + * <li>Throughput control support, you can throttle number of tuple for each streaming window</li> + * </ol> + * + * @since 3.3.0 + */ +@InterfaceStability.Evolving +public abstract class AbstractKafkaInputOperator implements InputOperator, + Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener, + Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback +{ + + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class); + + static { + // We create new consumers periodically to pull metadata (Kafka consumer keeps metadata in cache) + // Skip log4j log for ConsumerConfig class to avoid too much noise in application + LogManager.getLogger(ConsumerConfig.class).setLevel(Level.WARN); + } + + public enum InitialOffset + { + EARLIEST, // consume from beginning of the partition every time when application restart + LATEST, // consume from latest of the partition every time when application restart + // consume from committed position from last run or earliest if there is no committed offset(s) + APPLICATION_OR_EARLIEST, + APPLICATION_OR_LATEST // consume from committed position from last run or latest if there is no committed offset(s) + } + + @NotNull + private String[] clusters; + + @NotNull + private String[] topics; + + /** + * offset track for checkpoint + */ + private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>(); + + private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> windowStartOffset = new HashMap<>(); + + private transient int operatorId; + + private int initialPartitionCount = 1; + + private long repartitionInterval = 30000L; + + private long repartitionCheckInterval = 5000L; + + @Min(1) + private int maxTuplesPerWindow = Integer.MAX_VALUE; + + /** + * By default the operator start consuming from the committed offset or the latest one + */ + private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST; + + private long metricsRefreshInterval = 5000L; + + private long consumerTimeout = 5000L; + + private int holdingBufferSize = 1024; + + private Properties consumerProps = new Properties(); + + /** + * Assignment for each operator instance + */ + private Set<AbstractKafkaPartitioner.PartitionMeta> assignment; + + //=======================All transient fields========================== + + /** + * Wrapper consumer object + * It wraps KafkaConsumer, maintains consumer thread and store messages in a queue + */ + private final transient KafkaConsumerWrapper consumerWrapper = createConsumerWrapper(); + + /** + * By default the strategy is one to one + * + * @see PartitionStrategy + */ + private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE; + + /** + * count the emitted message in each window<br> + * non settable + */ + private transient int emitCount = 0; + + /** + * store offsets with window id, only keep offsets with windows that have not been committed + */ + private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = + new LinkedList<>(); + + /** + * Application name is used as group.id for kafka consumer + */ + private transient String applicationName; + + private transient AbstractKafkaPartitioner partitioner; + + private transient long currentWindowId; + + private transient long lastCheckTime = 0L; + + private transient long lastRepartitionTime = 0L; + + @AutoMetric + private transient KafkaMetrics metrics; + + private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); + + /** + * Creates the Wrapper consumer object + * It maintains consumer thread and store messages in a queue + * @return KafkaConsumerWrapper + */ + public KafkaConsumerWrapper createConsumerWrapper() + { + return new KafkaConsumerWrapper(); + } + + // Creates the consumer object and it wraps KafkaConsumer. + public abstract AbstractKafkaConsumer createConsumer(Properties prop); + + @Override + public void activate(Context.OperatorContext context) + { + consumerWrapper.start(isIdempotent()); + } + + @Override + public void deactivate() + { + consumerWrapper.stop(); + } + + @Override + public void checkpointed(long l) + { + + } + + @Override + public void beforeCheckpoint(long windowId) + { + + } + + @Override + public void committed(long windowId) + { + if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) { + return; + } + //ask kafka consumer wrapper to store the committed offsets + for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = + offsetHistory.iterator(); iter.hasNext(); ) { + Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next(); + if (item.getLeft() <= windowId) { + if (item.getLeft() == windowId) { + consumerWrapper.commitOffsets(item.getRight()); + } + iter.remove(); + } + } + if (isIdempotent()) { + try { + windowDataManager.committed(windowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + } + + @Override + public void emitTuples() + { + int count = consumerWrapper.messageSize(); + if (maxTuplesPerWindow > 0) { + count = Math.min(count, maxTuplesPerWindow - emitCount); + } + for (int i = 0; i < count; i++) { + Pair<String, ConsumerRecord<byte[], byte[]>> tuple = consumerWrapper.pollMessage(); + ConsumerRecord<byte[], byte[]> msg = tuple.getRight(); + emitTuple(tuple.getLeft(), msg); + AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(), + msg.topic(), msg.partition()); + offsetTrack.put(pm, msg.offset() + 1); + if (isIdempotent() && !windowStartOffset.containsKey(pm)) { + windowStartOffset.put(pm, msg.offset()); + } + } + emitCount += count; + } + + protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message); + + @Override + public void beginWindow(long wid) + { + emitCount = 0; + currentWindowId = wid; + windowStartOffset.clear(); + if (isIdempotent() && wid <= windowDataManager.getLargestCompletedWindow()) { + replay(wid); + } else { + consumerWrapper.afterReplay(); + } + } + + private void replay(long windowId) + { + try { + @SuppressWarnings("unchecked") + Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = + (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.retrieve(windowId); + consumerWrapper.emitImmediately(windowData); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + @Override + public void endWindow() + { + // copy current offset track to history memory + Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new HashMap<>(offsetTrack); + offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow)); + + //update metrics + metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics()); + + //update the windowDataManager + if (isIdempotent()) { + try { + Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = new HashMap<>(); + for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : windowStartOffset.entrySet()) { + windowData.put(e.getKey(), new MutablePair<>(e.getValue(), offsetTrack.get(e.getKey()) - e.getValue())); + } + windowDataManager.save(windowData, currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + } + + @Override + public void setup(Context.OperatorContext context) + { + applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME); + consumerWrapper.create(this); + metrics = new KafkaMetrics(metricsRefreshInterval); + windowDataManager.setup(context); + operatorId = context.getId(); + } + + @Override + public void teardown() + { + windowDataManager.teardown(); + } + + private void initPartitioner() + { + if (partitioner == null) { + logger.info("Initialize Partitioner"); + switch (strategy) { + case ONE_TO_ONE: + partitioner = new OneToOnePartitioner(clusters, topics, this); + break; + case ONE_TO_MANY: + partitioner = new OneToManyPartitioner(clusters, topics, this); + break; + case ONE_TO_MANY_HEURISTIC: + throw new UnsupportedOperationException("Not implemented yet"); + default: + throw new RuntimeException("Invalid strategy"); + } + logger.info("Actual Partitioner is {}", partitioner.getClass()); + } + + } + + @Override + public Response processStats(BatchedOperatorStats batchedOperatorStats) + { + long t = System.currentTimeMillis(); + if (repartitionInterval < 0 || repartitionCheckInterval < 0 || + t - lastCheckTime < repartitionCheckInterval || t - lastRepartitionTime < repartitionInterval) { + // return false if it's within repartitionCheckInterval since last time it check the stats + Response response = new Response(); + response.repartitionRequired = false; + return response; + } + + try { + logger.debug("Process stats"); + initPartitioner(); + return partitioner.processStats(batchedOperatorStats); + } finally { + lastCheckTime = System.currentTimeMillis(); + } + } + + @Override + public Collection<Partition<AbstractKafkaInputOperator>> definePartitions( + Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext) + { + logger.debug("Define partitions"); + initPartitioner(); + return partitioner.definePartitions(collection, partitioningContext); + } + + @Override + public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) + { + // update the last repartition time + lastRepartitionTime = System.currentTimeMillis(); + initPartitioner(); + partitioner.partitioned(map); + } + + /** + * A callback from consumer after it commits the offset + * + * @param map + * @param e + */ + public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) + { + if (logger.isDebugEnabled()) { + logger.debug("Commit offsets complete {} ", Joiner.on(';').withKeyValueSeparator("=").join(map)); + } + if (e != null) { + logger.warn("Exceptions in committing offsets {} : {} ", + Joiner.on(';').withKeyValueSeparator("=").join(map), e); + } + } + + public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment) + { + this.assignment = assignment; + } + + public Set<AbstractKafkaPartitioner.PartitionMeta> assignment() + { + return assignment; + } + + private boolean isIdempotent() + { + return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager); + } + + //---------------------------------------------setters and getters---------------------------------------- + public void setInitialPartitionCount(int partitionCount) + { + this.initialPartitionCount = partitionCount; + } + + /** + * initial partition count + * only used with PartitionStrategy.ONE_TO_MANY + * or customized strategy + */ + public int getInitialPartitionCount() + { + return initialPartitionCount; + } + + public void setClusters(String clusters) + { + this.clusters = clusters.split(";"); + } + + /** + * Same setting as bootstrap.servers property to KafkaConsumer + * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs + * To support multi cluster, you can have multiple bootstrap.servers separated by ";" + */ + public String getClusters() + { + return Joiner.on(';').join(clusters); + } + + public void setTopics(String topics) + { + this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics), String.class); + } + + /** + * The topics the operator consumes, separate by',' + * Topic name can only contain ASCII alphanumerics, '.', '_' and '-' + */ + public String getTopics() + { + return Joiner.on(", ").join(topics); + } + + public void setStrategy(String policy) + { + this.strategy = PartitionStrategy.valueOf(policy.toUpperCase()); + } + + public String getStrategy() + { + return strategy.name(); + } + + public void setInitialOffset(String initialOffset) + { + this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase()); + } + + /** + * Initial offset, it should be one of the following + * <ul> + * <li>earliest</li> + * <li>latest</li> + * <li>application_or_earliest</li> + * <li>application_or_latest</li> + * </ul> + */ + public String getInitialOffset() + { + return initialOffset.name(); + } + + public String getApplicationName() + { + return applicationName; + } + + public void setConsumerProps(Properties consumerProps) + { + this.consumerProps = consumerProps; + } + + /** + * Extra kafka consumer properties + * http://kafka.apache.org/090/documentation.html#newconsumerconfigs + * + * Please be aware that the properties below are set by the operator, don't override it + * + * <ul> + * <li>bootstrap.servers</li> + * <li>group.id</li> + * <li>auto.offset.reset</li> + * <li>enable.auto.commit</li> + * <li>partition.assignment.strategy</li> + * <li>key.deserializer</li> + * <li>value.deserializer</li> + * </ul> + */ + public Properties getConsumerProps() + { + return consumerProps; + } + + public void setMaxTuplesPerWindow(int maxTuplesPerWindow) + { + this.maxTuplesPerWindow = maxTuplesPerWindow; + } + + /** + * maximum tuples allowed to be emitted in each window + */ + public int getMaxTuplesPerWindow() + { + return maxTuplesPerWindow; + } + + /** + * @see <a href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"> + * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a> + */ + public long getConsumerTimeout() + { + return consumerTimeout; + } + + public void setConsumerTimeout(long consumerTimeout) + { + this.consumerTimeout = consumerTimeout; + } + + /** + * Number of messages kept in memory waiting for emission to downstream operator + */ + public int getHoldingBufferSize() + { + return holdingBufferSize; + } + + public void setHoldingBufferSize(int holdingBufferSize) + { + this.holdingBufferSize = holdingBufferSize; + } + + /** + * metrics refresh interval + */ + public long getMetricsRefreshInterval() + { + return metricsRefreshInterval; + } + + public void setMetricsRefreshInterval(long metricsRefreshInterval) + { + this.metricsRefreshInterval = metricsRefreshInterval; + } + + public void setRepartitionCheckInterval(long repartitionCheckInterval) + { + this.repartitionCheckInterval = repartitionCheckInterval; + } + + /** + * Minimal interval between checking collected stats and decide whether it needs to repartition or not. + * And minimal interval between 2 offset updates + */ + public long getRepartitionCheckInterval() + { + return repartitionCheckInterval; + } + + public void setRepartitionInterval(long repartitionInterval) + { + this.repartitionInterval = repartitionInterval; + } + + /** + * Minimal interval between 2 (re)partition actions + */ + public long getRepartitionInterval() + { + return repartitionInterval; + } + + public void setWindowDataManager(WindowDataManager windowDataManager) + { + this.windowDataManager = windowDataManager; + } + + public WindowDataManager getWindowDataManager() + { + return windowDataManager; + } + + /** + * @return current checkpointed offsets + * @omitFromUI + */ + public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack() + { + return offsetTrack; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java new file mode 100644 index 0000000..0e16fe1 --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.util.Properties; + +import javax.validation.constraints.NotNull; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +/** + * This is the base implementation of a Kafka output operator(0.9.0), which writes data to the Kafka message bus. + * + * @displayName Abstract Kafka Output + * @category Messaging + * @tags output operator + * + * + * @since 3.5.0 + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class AbstractKafkaOutputOperator<K, V> implements Operator +{ + private transient Producer<K, V> producer; + @NotNull + private String topic; + private Properties properties = new Properties(); + + @Override + public void setup(Context.OperatorContext context) + { + producer = new KafkaProducer<K, V>(properties); + } + + /** + * Implement Component Interface. + */ + @Override + public void teardown() + { + producer.close(); + } + + /** + * Implement Operator Interface. + */ + @Override + public void beginWindow(long windowId) + { + } + + /** + * Implement Operator Interface. + */ + @Override + public void endWindow() + { + } + + public Properties getProperties() + { + return properties; + } + + /** + * Set the Kafka producer properties. + * + * @param properties Producer properties + */ + public void setProperties(Properties properties) + { + this.properties.putAll(properties); + } + + /** + * Set the Kafka producer property. + * + * @param key Producer Property name + * @param val Producer Property value + */ + public void setProperty(Object key, Object val) + { + properties.put(key, val); + } + + public String getTopic() + { + return topic; + } + + /** + * Set the Kafka topic + * @param topic Kafka topic for which the data is sent + */ + public void setTopic(String topic) + { + this.topic = topic; + } + + protected Producer<K, V> getProducer() + { + return producer; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java new file mode 100644 index 0000000..791972f --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import com.google.common.base.Joiner; + +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StatsListener; +import com.datatorrent.lib.util.KryoCloneUtils; + +/** + * Abstract partitioner used to manage the partitions of kafka input operator. + * It use a number of kafka consumers(one for each cluster) to get the latest partition metadata for topics that + * the consumer subscribes and expose those to subclass which implements the assign method + * + * The partitioner is always stateless. + * + * @since 3.3.0 + */ +@InterfaceStability.Evolving +public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKafkaInputOperator>, StatsListener +{ + + private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaPartitioner.class); + + private static final String META_CONSUMER_GROUP_NAME = AbstractKafkaInputOperator.class.getName() + "META_GROUP"; + + protected final String[] clusters; + + protected final String[] topics; + + protected final AbstractKafkaInputOperator prototypeOperator; + + private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients; + + // prevent null + private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>(); + + public AbstractKafkaPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator) + { + this.clusters = clusters; + this.topics = topics; + this.prototypeOperator = prototypeOperator; + } + + abstract List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata); + + @Override + public Collection<Partition<AbstractKafkaInputOperator>> definePartitions( + Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext) + { + initMetadataClients(); + + Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>(); + + try { + for (int i = 0; i < clusters.length; i++) { + metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>()); + for (String topic : topics) { + //try several time if partitionsFor(topic) returns null or throws exception. + //partitionsFor(topic) will return null if the topic is invalid or hasn't completed + int tryTime = 10; + while (tryTime-- > 0) { + try { + List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic); + if (ptis != null) { + if (logger.isDebugEnabled()) { + logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis)); + } + metadata.get(clusters[i]).put(topic, ptis); + break; + } + + logger.warn("Partition metadata for topic {} is null. retrying...", topic); + + } catch (Exception e) { + logger.warn("Got Exception when trying get partition info for topic {}.", topic, e); + } + + try { + Thread.sleep(100); + } catch (Exception e1) { + //ignore + } + } //end while + + if (tryTime == 0) { + throw new RuntimeException( + "Get partition info for topic completely failed. Please check the log file. topic name: " + topic); + } + } + } + } finally { + closeClients(); + } + + List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null; + try { + parts = assign(metadata); + } catch (Exception e) { + logger.error("assign() exception.", e); + e.printStackTrace(); + } + + if (currentPartitions == parts || currentPartitions.equals(parts)) { + logger.debug("No partition change found"); + return collection; + } else { + logger.info("Partition change detected: "); + currentPartitions.clear(); + currentPartitions.addAll(parts); + int i = 0; + List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>(); + for (Iterator<Partition<AbstractKafkaInputOperator>> iter = collection.iterator(); iter.hasNext(); ) { + Partition<AbstractKafkaInputOperator> nextPartition = iter.next(); + if (parts.remove(nextPartition.getPartitionedInstance().assignment())) { + if (logger.isInfoEnabled()) { + logger.info("[Existing] Partition {} with assignment {} ", i, + Joiner.on(';').join(nextPartition.getPartitionedInstance().assignment())); + } + result.add(nextPartition); + i++; + } + } + + for (Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment : parts) { + if (logger.isInfoEnabled()) { + logger.info("[New] Partition {} with assignment {} ", i, + Joiner.on(';').join(partitionAssignment)); + } + result.add(createPartition(partitionAssignment)); + i++; + } + + return result; + } + } + + protected void closeClients() + { + for (KafkaConsumer<byte[], byte[]> consume : metadataRefreshClients) { + consume.close(); + } + metadataRefreshClients = null; + } + + @Override + public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) + { + + } + + @Override + public Response processStats(BatchedOperatorStats batchedOperatorStats) + { + Response response = new Response(); + response.repartitionRequired = true; + return response; + } + + protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition( + Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment) + { + Partitioner.Partition<AbstractKafkaInputOperator> p = + new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator)); + p.getPartitionedInstance().assign(partitionAssignment); + return p; + } + + /** + * + */ + private void initMetadataClients() + { + if (metadataRefreshClients != null && metadataRefreshClients.size() == clusters.length) { + // The metadata client is active + return; + } + + if (clusters == null || clusters.length == 0) { + throw new IllegalStateException("clusters can not be null"); + } + + metadataRefreshClients = new ArrayList<>(clusters.length); + int index = 0; + for (String c : clusters) { + Properties prop = prototypeOperator.getConsumerProps(); + prop.put("group.id", META_CONSUMER_GROUP_NAME); + prop.put("bootstrap.servers", c); + prop.put("key.deserializer", ByteArrayDeserializer.class.getName()); + prop.put("value.deserializer", ByteArrayDeserializer.class.getName()); + prop.put("enable.auto.commit", "false"); + if (logger.isInfoEnabled()) { + logger.info("Consumer Properties : {} ", getPropertyAsString(prop)); + } + metadataRefreshClients.add(index++, new KafkaConsumer<byte[], byte[]>(prop)); + } + } + + /** + * Converts the property list (key and element pairs) to String format + * This format is used to print to a Stream for debugging. + * @param prop + * @return String + */ + private String getPropertyAsString(Properties prop) + { + StringWriter writer = new StringWriter(); + try { + prop.store(writer, ""); + } catch (IOException e) { + logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage()); + } + return writer.getBuffer().toString(); + } + + /** + * The key object used in the assignment map for each operator + */ + public static class PartitionMeta + { + + public PartitionMeta() + { + } + + public PartitionMeta(String cluster, String topic, int partitionId) + { + this.cluster = cluster; + this.topic = topic; + this.partitionId = partitionId; + this.topicPartition = new TopicPartition(topic, partitionId); + } + + private String cluster; + + private transient TopicPartition topicPartition; + + private String topic; + + private int partitionId; + + public String getCluster() + { + return cluster; + } + + public int getPartitionId() + { + return partitionId; + } + + public String getTopic() + { + return topic; + } + + public TopicPartition getTopicPartition() + { + if (topicPartition == null) { + topicPartition = new TopicPartition(topic, partitionId); + } + return topicPartition; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionMeta that = (PartitionMeta)o; + return Objects.equals(partitionId, that.partitionId) && + Objects.equals(cluster, that.cluster) && + Objects.equals(topic, that.topic); + } + + @Override + public int hashCode() + { + return Objects.hash(cluster, topic, partitionId); + } + + @Override + public String toString() + { + return "PartitionMeta{" + + "cluster='" + cluster + '\'' + + ", topicPartition=" + getTopicPartition() + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java new file mode 100644 index 0000000..efd0750 --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import com.google.common.base.Joiner; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import com.datatorrent.netlet.util.DTThrowable; + +/** + * This is the wrapper class for new Kafka consumer API + * + * It starts number of consumers(one for each cluster) in same number of threads. + * Maintains the consumer offsets + * + * It also use the consumers to commit the application processed offsets along with the application name + * + * + * @since 3.3.0 + */ +@InterfaceStability.Evolving +public class KafkaConsumerWrapper implements Closeable +{ + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + + private AtomicBoolean isAlive = new AtomicBoolean(false); + + private final Map<String, AbstractKafkaConsumer> consumers = new HashMap<>(); + + // The in memory buffer hold consumed messages + private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>> holdingBuffer; + + private AbstractKafkaInputOperator ownerOperator = null; + + private ExecutorService kafkaConsumerExecutor; + + private final Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = new HashMap<>(); + + private boolean waitForReplay = false; + + /** + * + * Only put the offset needs to be committed in the ConsumerThread.offsetToCommit map + * The consumer thread will commit the offset(s) + * + * @param offsetsInWindow + */ + public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsInWindow) + { + if (offsetsInWindow == null) { + return; + } + + // group offsets by cluster and topic partition + for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : offsetsInWindow.entrySet()) { + String cluster = e.getKey().getCluster(); + Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMap = offsetsToCommit.get(cluster); + if (topicPartitionOffsetMap == null) { + logger.warn("committed offset map should be initialized by consumer thread!"); + continue; + } + topicPartitionOffsetMap.put(e.getKey().getTopicPartition(), new OffsetAndMetadata(e.getValue())); + } + + } + + public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData) + { + for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowEntry : windowData.entrySet()) { + AbstractKafkaPartitioner.PartitionMeta meta = windowEntry.getKey(); + Pair<Long, Long> replayOffsetSize = windowEntry.getValue(); + AbstractKafkaConsumer kc = consumers.get(meta.getCluster()); + if (kc == null && kc.isConsumerContainsPartition(windowEntry.getKey().getTopicPartition())) { + throw new RuntimeException("Coundn't find consumer to replay the message PartitionMeta : " + meta); + } + //pause other partition + for (TopicPartition tp : kc.getPartitions()) { + if (meta.getTopicPartition().equals(tp)) { + kc.resumePartition(tp); + } else { + try { + kc.positionPartition(tp); + } catch (NoOffsetForPartitionException e) { + //the poll() method of a consumer will throw exception + // if any of subscribed consumers not initialized with position + handleNoOffsetForPartitionException(e, kc); + } + kc.pausePartition(tp); + } + } + // set the offset to window start offset + kc.seekToOffset(meta.getTopicPartition(), replayOffsetSize.getLeft()); + long windowCount = replayOffsetSize.getRight(); + while (windowCount > 0) { + try { + ConsumerRecords<byte[], byte[]> records = kc.pollRecords(ownerOperator.getConsumerTimeout()); + for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0; ) { + ownerOperator.emitTuple(meta.getCluster(), cri.next()); + windowCount--; + } + } catch (NoOffsetForPartitionException e) { + throw new RuntimeException("Couldn't replay the offset", e); + } + } + // set the offset after window + kc.seekToOffset(meta.getTopicPartition(), replayOffsetSize.getLeft() + replayOffsetSize.getRight()); + } + + // resume all topics + for (AbstractKafkaConsumer kc : consumers.values()) { + kc.resumeAllPartitions(); + } + + } + + public void afterReplay() + { + waitForReplay = false; + } + + static final class ConsumerThread implements Runnable + { + + private final AbstractKafkaConsumer consumer; + + private final String cluster; + + private final KafkaConsumerWrapper wrapper; + + private Map<TopicPartition, OffsetAndMetadata> offsetToCommit = null; + + public ConsumerThread(String cluster, AbstractKafkaConsumer consumer, KafkaConsumerWrapper wrapper) + { + this.cluster = cluster; + this.consumer = consumer; + this.wrapper = wrapper; + this.offsetToCommit = new ConcurrentHashMap<>(); + wrapper.offsetsToCommit.put(cluster, offsetToCommit); + } + + @Override + public void run() + { + try { + + while (wrapper.isAlive.get()) { + if (wrapper.waitForReplay) { + Thread.sleep(100); + continue; + } + if (!this.offsetToCommit.isEmpty()) { + // in each fetch cycle commit the offset if needed + if (logger.isDebugEnabled()) { + logger.debug("Commit offsets {}", Joiner.on(';').withKeyValueSeparator("=").join(this.offsetToCommit)); + } + consumer.commitAsync(offsetToCommit, wrapper.ownerOperator); + offsetToCommit.clear(); + } + try { + ConsumerRecords<byte[], byte[]> records = consumer.pollRecords(wrapper.ownerOperator.getConsumerTimeout()); + for (ConsumerRecord<byte[], byte[]> record : records) { + wrapper.putMessage(Pair.of(cluster, record)); + } + } catch (NoOffsetForPartitionException e) { + wrapper.handleNoOffsetForPartitionException(e, consumer); + } catch (InterruptedException e) { + throw new IllegalStateException("Consumer thread is interrupted unexpectedly", e); + } + } + } catch (WakeupException we) { + logger.info("The consumer is being stopped"); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } finally { + consumer.close(); + } + } + } + + protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, + AbstractKafkaConsumer consumer) + { + // if initialOffset is set to EARLIST or LATEST + // and the application is run as first time + // then there is no existing committed offset and this error will be caught + // we need to seek to either beginning or end of the partition + // based on the initial offset setting + AbstractKafkaInputOperator.InitialOffset io = + AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset()); + if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST + || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) { + consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0])); + } else { + consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0])); + } + + } + + /** + * This method is called in setup method of Abstract Kafka Input Operator + */ + public void create(AbstractKafkaInputOperator ownerOperator) + { + holdingBuffer = new ArrayBlockingQueue<>(ownerOperator.getHoldingBufferSize()); + this.ownerOperator = ownerOperator; + logger.info("Create consumer wrapper with holding buffer size: {} ", ownerOperator.getHoldingBufferSize()); + if (logger.isInfoEnabled()) { + logger.info("Assignments are {} ", Joiner.on('\n').join(ownerOperator.assignment())); + } + } + + /** + * This method is called in the activate method of the operator + */ + public void start(boolean waitForReplay) + { + this.waitForReplay = waitForReplay; + isAlive.set(true); + + // thread to consume the kafka data + // create thread pool for consumer threads + kafkaConsumerExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build()); + + // group list of PartitionMeta by cluster + Map<String, List<TopicPartition>> consumerAssignment = new HashMap<>(); + Set<AbstractKafkaPartitioner.PartitionMeta> assignments = ownerOperator.assignment(); + for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : assignments) { + String cluster = partitionMeta.getCluster(); + List<TopicPartition> cAssignment = consumerAssignment.get(cluster); + if (cAssignment == null) { + cAssignment = new LinkedList<>(); + consumerAssignment.put(cluster, cAssignment); + } + cAssignment.add(new TopicPartition(partitionMeta.getTopic(), partitionMeta.getPartitionId())); + } + + Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = ownerOperator.getOffsetTrack(); + + // create one thread for each cluster + // each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ topic(s) + for (Map.Entry<String, List<TopicPartition>> e : consumerAssignment.entrySet()) { + + Properties prop = new Properties(); + if (ownerOperator.getConsumerProps() != null) { + prop.putAll(ownerOperator.getConsumerProps()); + } + + prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, e.getKey()); + prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + // never auto commit the offsets + prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + AbstractKafkaInputOperator.InitialOffset initialOffset = + AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset()); + + if (initialOffset == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST || + initialOffset == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) { + // commit the offset with application name if we set initialoffset to application + prop.put(ConsumerConfig.GROUP_ID_CONFIG, ownerOperator.getApplicationName() + "_Consumer"); + } + + AbstractKafkaConsumer kc = ownerOperator.createConsumer(prop); + kc.assignPartitions(e.getValue()); + if (logger.isInfoEnabled()) { + logger.info("Create consumer with properties {} ", Joiner.on(";").withKeyValueSeparator("=").join(prop)); + logger.info("Assign consumer to {}", Joiner.on('#').join(e.getValue())); + } + if (currentOffset != null && !currentOffset.isEmpty()) { + for (TopicPartition tp : e.getValue()) { + AbstractKafkaPartitioner.PartitionMeta partitionKey = + new AbstractKafkaPartitioner.PartitionMeta(e.getKey(), tp.topic(), tp.partition()); + if (currentOffset.containsKey(partitionKey)) { + kc.seekToOffset(tp, currentOffset.get(partitionKey)); + } + } + } + + consumers.put(e.getKey(), kc); + kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this)); + } + + } + + /** + * The method is called in the deactivate method of the operator + */ + public void stop() + { + isAlive.set(false); + for (AbstractKafkaConsumer c : consumers.values()) { + c.wakeup(); + } + kafkaConsumerExecutor.shutdownNow(); + holdingBuffer.clear(); + IOUtils.closeQuietly(this); + } + + /** + * This method is called in teardown method of the operator + */ + public void teardown() + { + holdingBuffer.clear(); + } + + public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage() + { + return holdingBuffer.poll(); + } + + public int messageSize() + { + return holdingBuffer.size(); + } + + protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>> msg) throws InterruptedException + { + // block from receiving more message + holdingBuffer.put(msg); + } + + @Override + public void close() throws IOException + { + } + + public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics() + { + Map<String, Map<MetricName, ? extends Metric>> val = new HashMap<>(); + for (Map.Entry<String, AbstractKafkaConsumer> e : consumers.entrySet()) { + val.put(e.getKey(), e.getValue().metrics()); + } + return val; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java new file mode 100644 index 0000000..75449a1 --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +import com.datatorrent.api.AutoMetric; + +/** + * Metrics class + * + * @since 3.3.0 + */ +@InterfaceStability.Evolving +public class KafkaMetrics implements Serializable +{ + private KafkaConsumerStats[] stats; + + private transient long lastMetricSampleTime = 0L; + + private transient long metricsRefreshInterval; + + public KafkaMetrics(long metricsRefreshInterval) + { + this.metricsRefreshInterval = metricsRefreshInterval; + } + + void updateMetrics(String[] clusters, Map<String, Map<MetricName, ? extends Metric>> metricsMap) + { + long current = System.currentTimeMillis(); + if (current - lastMetricSampleTime < metricsRefreshInterval) { + return; + } + + lastMetricSampleTime = current; + + if (stats == null) { + stats = new KafkaConsumerStats[clusters.length]; + } + + for (int i = 0; i < clusters.length; i++) { + if (stats[i] == null) { + stats[i] = new KafkaConsumerStats(); + stats[i].cluster = clusters[i]; + } + Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters[i]); + if (cMetrics == null || cMetrics.isEmpty()) { + stats[i].bytesPerSec = 0; + stats[i].msgsPerSec = 0; + continue; + } + if (stats[i].bytePerSecMK == null || stats[i].msgPerSecMK == null) { + for (MetricName mn : cMetrics.keySet()) { + if (mn.name().equals("bytes-consumed-rate")) { + stats[i].bytePerSecMK = mn; + } else if (mn.name().equals("records-consumed-rate")) { + stats[i].msgPerSecMK = mn; + } + } + } + stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value(); + stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value(); + } + } + + public KafkaConsumerStats[] getStats() + { + return stats; + } + + /** + * Counter class which gives the statistic value from the consumer + */ + public static class KafkaConsumerStats implements Serializable + { + private static final long serialVersionUID = -2867402654990209006L; + + public transient MetricName msgPerSecMK; + public transient MetricName bytePerSecMK; + + public String cluster; + /** + * Metrics for each consumer + */ + public double msgsPerSec; + + public double bytesPerSec; + + public KafkaConsumerStats() + { + } + } + + public static class KafkaMetricsAggregator implements AutoMetric.Aggregator, Serializable + { + + @Override + public Map<String, Object> aggregate(long l, Collection<AutoMetric.PhysicalMetricsContext> collection) + { + double totalBytesPerSec = 0; + double totalMsgsPerSec = 0; + Map<String, Object> total = new HashMap<>(); + for (AutoMetric.PhysicalMetricsContext pmc : collection) { + KafkaMetrics km = (KafkaMetrics)pmc.getMetrics().get("metrics"); + for (KafkaConsumerStats kcs : km.stats) { + totalBytesPerSec += kcs.bytesPerSec; + totalMsgsPerSec += kcs.msgsPerSec; + } + } + total.put("totalBytesPerSec", totalBytesPerSec); + total.put("totalMsgPerSec", totalMsgsPerSec); + return total; + } + } +} + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java new file mode 100644 index 0000000..a07fe33 --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.io.Serializable; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * @since 2.1.0 + */ +@InterfaceStability.Evolving +public class KafkaPartition implements Serializable +{ + protected static final String DEFAULT_CLUSTERID = "com.datatorrent.contrib.kafka.defaultcluster"; + + @SuppressWarnings("unused") + private KafkaPartition() + { + } + + public KafkaPartition(String topic, int partitionId) + { + this(DEFAULT_CLUSTERID, topic, partitionId); + } + + public KafkaPartition(String clusterId, String topic, int partitionId) + { + super(); + this.clusterId = clusterId; + this.partitionId = partitionId; + this.topic = topic; + } + + /** + * + */ + private static final long serialVersionUID = 7556802229202221546L; + + private String clusterId; + + private int partitionId; + + private String topic; + + public String getClusterId() + { + return clusterId; + } + + public void setClusterId(String clusterId) + { + this.clusterId = clusterId; + } + + public int getPartitionId() + { + return partitionId; + } + + public void setPartitionId(int partitionId) + { + this.partitionId = partitionId; + } + + public String getTopic() + { + return topic; + } + + public void setTopic(String topic) + { + this.topic = topic; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode()); + result = prime * result + partitionId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + KafkaPartition other = (KafkaPartition)obj; + if (clusterId == null) { + if (other.clusterId != null) { + return false; + } + } else if (!clusterId.equals(other.clusterId)) { + return false; + } + if (partitionId != other.partitionId) { + return false; + } + if (topic == null) { + if (other.topic != null) { + return false; + } + } else if (!topic.equals(other.topic)) { + return false; + } + return true; + } + + @Override + public String toString() + { + return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java new file mode 100644 index 0000000..c47cf3d --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.kafka; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import com.datatorrent.api.DefaultInputPort; + +/** + * Kafka output operator with single input port (inputPort). + * It supports atleast once processing guarantees + * + * @since 3.5.0 + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class KafkaSinglePortOutputOperator<K, V> extends AbstractKafkaOutputOperator +{ + /** + * This input port receives tuples that will be written out to Kafka. + */ + public final transient DefaultInputPort<V> inputPort = new DefaultInputPort<V>() + { + @Override + public void process(V tuple) + { + getProducer().send(new ProducerRecord<K, V>(getTopic(), tuple)); + } + }; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java new file mode 100644 index 0000000..eb0cc40 --- /dev/null +++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.common.PartitionInfo; + +/** + * A one-to-many partitioner implementation that creates fix number of operator partitions and assign one or more + * Kafka partitions to each. It use round robin to assign partitions + * + * @since 3.3.0 + */ +@InterfaceStability.Evolving +public class OneToManyPartitioner extends AbstractKafkaPartitioner +{ + + public OneToManyPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator protoTypeOperator) + { + super(clusters, topics, protoTypeOperator); + } + + @Override + List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata) + { + if (prototypeOperator.getInitialPartitionCount() <= 0) { + throw new IllegalArgumentException("Num of partitions should be greater or equal to 1"); + } + + int partitionCount = prototypeOperator.getInitialPartitionCount(); + ArrayList<Set<PartitionMeta>> eachPartitionAssignment = + new ArrayList<>(prototypeOperator.getInitialPartitionCount()); + int i = 0; + for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) { + for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) { + for (PartitionInfo pif : topicPartition.getValue()) { + int index = i++ % partitionCount; + if (index >= eachPartitionAssignment.size()) { + eachPartitionAssignment.add(new HashSet<PartitionMeta>()); + } + eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), + topicPartition.getKey(), pif.partition())); + } + } + } + + return eachPartitionAssignment; + } + +}