[FLINK-3058] Add support for Kafka 0.9.0.0 For adding Kafka 0.9.0.0 support, this commit changes the following: - Split up of the kafka connector into a flink-connector-kafka-(base|0.9|0.8) with different dependencies - The base package contains common test cases, classes and implementations (the producer for 0.9 and 0.8 relies on exactly the same code) - the 0.8 package contains a kafka connector implementation against the SimpleConsumer (low level) API of Kafka 0.8. There are some additional tests for the ZK offset committing - The 0.9 package relies on the new Consumer API of Kafka 0.9.0.0 - Support for metrics for all producers and the 0.9 consumer through Flink's accumulators.
This closes #1489 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81320c1c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81320c1c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81320c1c Branch: refs/heads/master Commit: 81320c1c7ee98b9a663998df51cc4d5aa73d9b2a Parents: 83fb2fa Author: Robert Metzger <[email protected]> Authored: Wed Dec 16 17:29:42 2015 +0100 Committer: Robert Metzger <[email protected]> Committed: Wed Jan 20 20:30:50 2016 +0100 ---------------------------------------------------------------------- .gitignore | 1 + docs/apis/streaming/connectors/kafka.md | 67 +- docs/apis/streaming/index.md | 4 +- .../flink/api/java/typeutils/TypeExtractor.java | 4 +- .../webmonitor/RuntimeMonitorHandler.java | 6 + .../flink-connector-kafka-0.8/pom.xml | 160 ++ .../connectors/kafka/FlinkKafkaConsumer08.java | 592 +++++++ .../connectors/kafka/FlinkKafkaConsumer081.java | 39 + .../connectors/kafka/FlinkKafkaConsumer082.java | 39 + .../connectors/kafka/FlinkKafkaProducer.java | 64 + .../connectors/kafka/FlinkKafkaProducer08.java | 128 ++ .../connectors/kafka/internals/Fetcher.java | 81 + .../kafka/internals/LegacyFetcher.java | 652 ++++++++ .../kafka/internals/OffsetHandler.java | 56 + .../kafka/internals/PartitionerWrapper.java | 49 + .../kafka/internals/ZookeeperOffsetHandler.java | 146 ++ .../connectors/kafka/Kafka08ITCase.java | 266 ++++ .../connectors/kafka/Kafka08ProducerITCase.java | 33 + .../connectors/kafka/KafkaConsumerTest.java | 152 ++ .../connectors/kafka/KafkaLocalSystemTime.java | 48 + .../connectors/kafka/KafkaProducerTest.java | 114 ++ .../kafka/KafkaTestEnvironmentImpl.java | 337 ++++ .../internals/ZookeeperOffsetHandlerTest.java | 56 + .../src/test/resources/log4j-test.properties | 29 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kafka-0.9/pom.xml | 131 ++ .../connectors/kafka/FlinkKafkaConsumer09.java | 501 ++++++ .../connectors/kafka/FlinkKafkaProducer09.java | 130 ++ .../kafka/examples/ReadFromKafka.java | 56 + .../kafka/examples/WriteIntoKafka.java | 70 + .../src/main/resources/log4j.properties | 29 + .../connectors/kafka/Kafka09ITCase.java | 114 ++ .../connectors/kafka/Kafka09ProducerITCase.java | 33 + .../connectors/kafka/KafkaProducerTest.java | 115 ++ .../kafka/KafkaTestEnvironmentImpl.java | 340 ++++ .../src/test/resources/log4j-test.properties | 29 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kafka-base/pom.xml | 169 ++ .../kafka/FlinkKafkaConsumerBase.java | 225 +++ .../kafka/FlinkKafkaProducerBase.java | 291 ++++ .../kafka/internals/KafkaTopicPartition.java | 132 ++ .../internals/KafkaTopicPartitionLeader.java | 129 ++ .../internals/ZooKeeperStringSerializer.java | 51 + .../metrics/AvgKafkaMetricAccumulator.java | 141 ++ .../metrics/DefaultKafkaMetricAccumulator.java | 159 ++ .../metrics/MaxKafkaMetricAccumulator.java | 57 + .../metrics/MinKafkaMetricAccumulator.java | 57 + .../kafka/partitioner/FixedPartitioner.java | 80 + .../kafka/partitioner/KafkaPartitioner.java | 41 + .../KeyedDeserializationSchema.java | 52 + .../KeyedDeserializationSchemaWrapper.java | 51 + .../serialization/KeyedSerializationSchema.java | 48 + .../KeyedSerializationSchemaWrapper.java | 43 + ...eInformationKeyValueSerializationSchema.java | 171 ++ .../KafkaConsumerPartitionAssignmentTest.java | 273 ++++ .../connectors/kafka/KafkaConsumerTestBase.java | 1371 ++++++++++++++++ .../connectors/kafka/KafkaProducerTestBase.java | 187 +++ .../connectors/kafka/KafkaTestBase.java | 170 ++ .../connectors/kafka/KafkaTestEnvironment.java | 83 + .../connectors/kafka/TestFixedPartitioner.java | 104 ++ .../kafka/testutils/DataGenerators.java | 219 +++ .../kafka/testutils/DiscardingSink.java | 33 + .../kafka/testutils/FailingIdentityMapper.java | 115 ++ .../testutils/JobManagerCommunicationUtils.java | 76 + .../kafka/testutils/MockRuntimeContext.java | 157 ++ .../testutils/PartitionValidatingMapper.java | 53 + .../kafka/testutils/ThrottledMapper.java | 44 + .../kafka/testutils/Tuple2Partitioner.java | 51 + .../testutils/ValidatingExactlyOnceSink.java | 82 + .../src/test/resources/log4j-test.properties | 29 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kafka/pom.xml | 141 -- .../connectors/kafka/FlinkKafkaConsumer.java | 815 ---------- .../connectors/kafka/FlinkKafkaConsumer081.java | 58 - .../connectors/kafka/FlinkKafkaConsumer082.java | 85 - .../connectors/kafka/FlinkKafkaProducer.java | 340 ---- .../connectors/kafka/internals/Fetcher.java | 81 - .../kafka/internals/KafkaTopicPartition.java | 124 -- .../internals/KafkaTopicPartitionLeader.java | 129 -- .../kafka/internals/LegacyFetcher.java | 648 -------- .../kafka/internals/OffsetHandler.java | 56 - .../kafka/internals/PartitionerWrapper.java | 49 - .../internals/ZooKeeperStringSerializer.java | 51 - .../kafka/internals/ZookeeperOffsetHandler.java | 143 -- .../kafka/partitioner/FixedPartitioner.java | 80 - .../kafka/partitioner/KafkaPartitioner.java | 42 - .../KafkaConsumerPartitionAssignmentTest.java | 273 ---- .../connectors/kafka/KafkaConsumerTest.java | 155 -- .../connectors/kafka/KafkaConsumerTestBase.java | 1475 ------------------ .../streaming/connectors/kafka/KafkaITCase.java | 133 -- .../connectors/kafka/KafkaLocalSystemTime.java | 48 - .../connectors/kafka/KafkaProducerITCase.java | 189 --- .../connectors/kafka/KafkaProducerTest.java | 114 -- .../connectors/kafka/KafkaTestBase.java | 387 ----- .../connectors/kafka/TestFixedPartitioner.java | 104 -- .../internals/ZookeeperOffsetHandlerTest.java | 67 - .../kafka/testutils/DataGenerators.java | 214 --- .../kafka/testutils/DiscardingSink.java | 33 - .../kafka/testutils/FailingIdentityMapper.java | 115 -- .../testutils/JobManagerCommunicationUtils.java | 76 - .../kafka/testutils/MockRuntimeContext.java | 157 -- .../testutils/PartitionValidatingMapper.java | 53 - .../kafka/testutils/SuccessException.java | 26 - .../kafka/testutils/ThrottledMapper.java | 44 - .../kafka/testutils/Tuple2Partitioner.java | 51 - .../testutils/ValidatingExactlyOnceSink.java | 81 - .../src/test/resources/log4j-test.properties | 29 - .../src/test/resources/logback-test.xml | 30 - flink-streaming-connectors/pom.xml | 4 +- .../api/environment/CheckpointConfig.java | 2 +- .../KeyedDeserializationSchema.java | 52 - .../KeyedDeserializationSchemaWrapper.java | 51 - .../serialization/KeyedSerializationSchema.java | 48 - .../KeyedSerializationSchemaWrapper.java | 43 - ...eInformationKeyValueSerializationSchema.java | 171 -- .../TypeInformationSerializationSchema.java | 1 - .../EventTimeAllWindowCheckpointingITCase.java | 29 +- .../EventTimeWindowCheckpointingITCase.java | 29 +- .../WindowCheckpointingITCase.java | 27 +- .../flink/test/util/SuccessException.java | 26 + .../org/apache/flink/test/util/TestUtils.java | 52 + pom.xml | 1 + 122 files changed, 9770 insertions(+), 7167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 8e30de7..a73a9d3 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/gen flink-runtime-web/web-dashboard/node_modules/ flink-runtime-web/web-dashboard/bower_components/ atlassian-ide-plugin.xml +out/ http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/docs/apis/streaming/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md index 9d9d6ba..f0757ce 100644 --- a/docs/apis/streaming/connectors/kafka.md +++ b/docs/apis/streaming/connectors/kafka.md @@ -34,14 +34,15 @@ exactly-once processing semantics. To achieve that, Flink does not purely rely o offset tracking, but tracks and checkpoints these offsets internally as well. Please pick a package (maven artifact id) and class name for your use-case and environment. -For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is appropriate. +For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is appropriate. <table class="table table-bordered"> <thead> <tr> <th class="text-left">Maven Dependency</th> <th class="text-left">Supported since</th> - <th class="text-left">Class name</th> + <th class="text-left">Consumer and <br> + Producer Class name</th> <th class="text-left">Kafka version</th> <th class="text-left">Notes</th> </tr> @@ -50,17 +51,27 @@ For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is <tr> <td>flink-connector-kafka</td> <td>0.9.1, 0.10</td> - <td>FlinkKafkaConsumer081</td> - <td>0.8.1</td> + <td>FlinkKafkaConsumer082<br> + FlinkKafkaProducer</td> + <td>0.8.x</td> <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> </tr> - <tr> - <td>flink-connector-kafka</td> - <td>0.9.1, 0.10</td> - <td>FlinkKafkaConsumer082</td> - <td>0.8.2</td> + <tr> + <td>flink-connector-kafka-0.8</td> + <td>1.0.0</td> + <td>FlinkKafkaConsumer08<br> + FlinkKafkaProducer08</td> + <td>0.8.x</td> <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> </tr> + <tr> + <td>flink-connector-kafka-0.9</td> + <td>1.0.0</td> + <td>FlinkKafkaConsumer09<br> + FlinkKafkaProducer09</td> + <td>0.9.x</td> + <td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td> + </tr> </tbody> </table> @@ -69,7 +80,7 @@ Then, import the connector in your maven project: {% highlight xml %} <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka</artifactId> + <artifactId>flink-connector-kafka-0.8</artifactId> <version>{{site.version }}</version> </dependency> {% endhighlight %} @@ -84,14 +95,16 @@ Note that the streaming connectors are currently not part of the binary distribu #### Kafka Consumer -The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor: +Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09`). It provides access to one or more Kafka topics. + +The constructor accepts the following arguments: -1. The topic name -2. A DeserializationSchema +1. The topic name / list of topic names +2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka 3. Properties for the Kafka consumer. The following properties are required: - "bootstrap.servers" (comma separated list of Kafka brokers) - - "zookeeper.connect" (comma separated list of Zookeeper servers) + - "zookeeper.connect" (comma separated list of Zookeeper servers) (**only required for Kafka 0.8**) - "group.id" the id of the consumer group Example: @@ -101,10 +114,11 @@ Example: {% highlight java %} Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); +// only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env - .addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties)) + .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties)) .print(); {% endhighlight %} </div> @@ -112,15 +126,28 @@ DataStream<String> stream = env {% highlight scala %} val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); +// only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); stream = env - .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties)) + .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)) .print {% endhighlight %} </div> </div> + +##### The `DeserializationSchema` + +The `FlinkKafkaConsumer08` needs to know how to turn the data in Kafka into Java objects. The +`DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)` +method gets called for each Kafka message, passing the value from Kafka. +For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has +the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`. + +For convenience, Flink provides a `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) +which creates a schema based on a Flink `TypeInformation`. + #### Kafka Consumers and Fault Tolerance With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all @@ -155,20 +182,20 @@ If checkpointing is not enabled, the Kafka consumer will periodically commit the #### Kafka Producer -The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns -recors to partitions. +The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns +records to partitions. Example: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema())); +stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema())) +stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema())) {% endhighlight %} </div> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/docs/apis/streaming/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index 06c0014..9d2481c 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -1630,7 +1630,7 @@ Collection-based: Custom: - `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use - `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details. + `addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details. </div> @@ -1682,7 +1682,7 @@ Collection-based: Custom: - `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use - `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details. + `addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details. </div> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 1a4ccae..ddb4a48 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -820,7 +820,7 @@ public class TypeExtractor { validateInfo(typeHierarchy, t, inType); } catch(InvalidTypesException e) { - throw new InvalidTypesException("Input mismatch: " + e.getMessage()); + throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); } } @@ -840,7 +840,7 @@ public class TypeExtractor { validateInfo(typeHierarchy, inType, inTypeInfo); } catch(InvalidTypesException e) { - throw new InvalidTypesException("Input mismatch: " + e.getMessage()); + throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index 4143434..c304abb 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -36,6 +36,8 @@ import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; import scala.Tuple2; import scala.concurrent.Await; @@ -58,6 +60,8 @@ import static com.google.common.base.Preconditions.checkNotNull; @ChannelHandler.Sharable public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> { + private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class); + private static final Charset ENCODING = Charset.forName("UTF-8"); public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address"; @@ -143,12 +147,14 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> { : Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING)); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); + LOG.warn("Error while handling request", e); } catch (Exception e) { byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes)); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); + LOG.warn("Error while handling request", e); } response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml new file mode 100644 index 0000000..aae4847 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml @@ -0,0 +1,160 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-connectors-parent</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-0.8</artifactId> + <name>flink-connector-kafka-0.8</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.8.2.0</kafka.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-annotation</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-curator-recipes</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java new file mode 100644 index 0000000..543e0ff --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.cluster.Broker; +import kafka.common.ErrorMapping; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.consumer.SimpleConsumer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.util.NetUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's High-Level Consumer API (0.8.x). + * Most of Kafka's configuration variables can be used with this consumer as well: + * <ul> + * <li>socket.timeout.ms</li> + * <li>socket.receive.buffer.bytes</li> + * <li>fetch.message.max.bytes</li> + * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li> + * <li>fetch.wait.max.ms</li> + * </ul> + * </li> + * </ul> + * + * <h1>Offset handling</h1> + * + * <p>Offsets whose records have been read and are checkpointed will be committed back to ZooKeeper + * by the offset handler. In addition, the offset handler finds the point where the source initially + * starts reading from the stream, when the streaming job is started.</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>If checkpointing is disabled, the consumer will periodically commit the current offset + * to Zookeeper.</p> + * + * <p>When using a Kafka topic to send data between Flink jobs, we recommend using the + * {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p> + * + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer + * is constructed. That means that the client that submits the program needs to be able to + * reach the Kafka brokers or ZooKeeper.</p> + */ +public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { + + // ------------------------------------------------------------------------ + + private static final long serialVersionUID = -6272159445203409112L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer08.class); + + /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), + * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ + public static final long OFFSET_NOT_SET = -915623761776L; + + + /** Configuration key for the number of retries for getting the partition info */ + public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; + + /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */ + public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3; + + + // ------ Configuration of the Consumer ------- + + /** List of partitions (including topics and leaders) to consume */ + private final List<KafkaTopicPartitionLeader> partitionInfos; + + /** The properties to parametrize the Kafka consumer and ZooKeeper client */ + private final Properties props; + + + // ------ Runtime State ------- + + /** The fetcher used to pull data from the Kafka brokers */ + private transient Fetcher fetcher; + + /** The committer that persists the committed offsets */ + private transient OffsetHandler offsetHandler; + + /** The partitions actually handled by this consumer at runtime */ + private transient List<KafkaTopicPartitionLeader> subscribedPartitions; + + /** The latest offsets that have been committed to Kafka or ZooKeeper. These are never + * newer then the last offsets (Flink's internal view is fresher) */ + private transient HashMap<KafkaTopicPartition, Long> committedOffsets; + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(deserializer, props); + + checkNotNull(topics, "topics"); + this.props = checkNotNull(props, "props"); + + // validate the zookeeper properties + validateZooKeeperConfig(props); + + // Connect to a broker to get the partitions for all topics + this.partitionInfos = getPartitionsForTopic(topics, props); + + if (partitionInfos.size() == 0) { + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics.toString() + "." + + "Please check previous log entries"); + } + + if (LOG.isInfoEnabled()) { + logPartitionInfo(KafkaTopicPartition.convertToPartitionInfo(partitionInfos)); + } + } + + // ------------------------------------------------------------------------ + // Source life cycle + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks(); + final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); + + // pick which partitions we work on + subscribedPartitions = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); + + if (LOG.isInfoEnabled()) { + LOG.info("Kafka consumer {} will read partitions {} out of partitions {}", + thisConsumerIndex, KafkaTopicPartitionLeader.toString(subscribedPartitions), this.partitionInfos.size()); + } + + // we leave the fetcher as null, if we have no partitions + if (subscribedPartitions.isEmpty()) { + LOG.info("Kafka consumer {} has no partitions (empty source)", thisConsumerIndex); + this.fetcher = null; // fetcher remains null + return; + } + + // create fetcher + fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName()); + + // offset handling + offsetHandler = new ZookeeperOffsetHandler(props); + + committedOffsets = new HashMap<>(); + + // seek to last known pos, from restore request + if (restoreToOffset != null) { + if (LOG.isInfoEnabled()) { + LOG.info("Consumer {} is restored from previous checkpoint: {}", + thisConsumerIndex, KafkaTopicPartition.toString(restoreToOffset)); + } + + for (Map.Entry<KafkaTopicPartition, Long> restorePartition: restoreToOffset.entrySet()) { + // seek fetcher to restore position + // we set the offset +1 here, because seek() is accepting the next offset to read, + // but the restore offset is the last read offset + fetcher.seek(restorePartition.getKey(), restorePartition.getValue() + 1); + } + // initialize offsets with restored state + this.offsetsState = restoreToOffset; + restoreToOffset = null; + } + else { + // start with empty offsets + offsetsState = new HashMap<>(); + + // no restore request. Let the offset handler take care of the initial offset seeking + offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher); + } + } + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + if (fetcher != null) { + // For non-checkpointed sources, a thread which periodically commits the current offset into ZK. + PeriodicOffsetCommitter<T> offsetCommitter = null; + + // check whether we need to start the periodic checkpoint committer + StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); + if (!streamingRuntimeContext.isCheckpointingEnabled()) { + // we use Kafka's own configuration parameter key for this. + // Note that the default configuration value in Kafka is 60 * 1000, so we use the + // same here. + long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000")); + offsetCommitter = new PeriodicOffsetCommitter<>(commitInterval, this); + offsetCommitter.setDaemon(true); + offsetCommitter.start(); + LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); + } + + try { + fetcher.run(sourceContext, deserializer, offsetsState); + } finally { + if (offsetCommitter != null) { + offsetCommitter.close(); + try { + offsetCommitter.join(); + } catch(InterruptedException ie) { + // ignore interrupt + } + } + } + } + else { + // this source never completes, so emit a Long.MAX_VALUE watermark + // to not block watermark forwarding + if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) { + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + + final Object waitLock = new Object(); + while (running) { + // wait until we are canceled + try { + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + waitLock.wait(); + } + } + catch (InterruptedException e) { + // do nothing, check our "running" status + } + } + } + + // close the context after the work was done. this can actually only + // happen when the fetcher decides to stop fetching + sourceContext.close(); + } + + @Override + public void cancel() { + // set ourselves as not running + running = false; + + // close the fetcher to interrupt any work + Fetcher fetcher = this.fetcher; + this.fetcher = null; + if (fetcher != null) { + try { + fetcher.close(); + } + catch (IOException e) { + LOG.warn("Error while closing Kafka connector data fetcher", e); + } + } + + OffsetHandler offsetHandler = this.offsetHandler; + this.offsetHandler = null; + if (offsetHandler != null) { + try { + offsetHandler.close(); + } + catch (IOException e) { + LOG.warn("Error while closing Kafka connector offset handler", e); + } + } + } + + @Override + public void close() throws Exception { + cancel(); + super.close(); + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + /** + * Utility method to commit offsets. + * + * @param toCommit the offsets to commit + * @throws Exception + */ + protected void commitOffsets(HashMap<KafkaTopicPartition, Long> toCommit) throws Exception { + Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(); + for (KafkaTopicPartitionLeader tp : this.subscribedPartitions) { + Long offset = toCommit.get(tp.getTopicPartition()); + if(offset == null) { + // There was no data ever consumed from this topic, that's why there is no entry + // for this topicPartition in the map. + continue; + } + Long lastCommitted = this.committedOffsets.get(tp.getTopicPartition()); + if (lastCommitted == null) { + lastCommitted = OFFSET_NOT_SET; + } + if (offset != OFFSET_NOT_SET) { + if (offset > lastCommitted) { + offsetsToCommit.put(tp.getTopicPartition(), offset); + this.committedOffsets.put(tp.getTopicPartition(), offset); + LOG.debug("Committing offset {} for partition {}", offset, tp.getTopicPartition()); + } else { + LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp.getTopicPartition()); + } + } + } + + if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) { + LOG.debug("Committing offsets {} to Zookeeper", KafkaTopicPartition.toString(offsetsToCommit)); + } + + this.offsetHandler.commit(offsetsToCommit); + } + + // ------------------------------------------------------------------------ + // Miscellaneous utilities + // ------------------------------------------------------------------------ + + /** + * Thread to periodically commit the current read offset into Zookeeper. + */ + private static class PeriodicOffsetCommitter<T> extends Thread { + private final long commitInterval; + private final FlinkKafkaConsumer08<T> consumer; + private volatile boolean running = true; + + public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer08<T> consumer) { + this.commitInterval = commitInterval; + this.consumer = consumer; + } + + @Override + public void run() { + try { + + while (running) { + try { + Thread.sleep(commitInterval); + // ------------ commit current offsets ---------------- + + // create copy of current offsets + @SuppressWarnings("unchecked") + HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.offsetsState.clone(); + consumer.commitOffsets(currentOffsets); + } catch (InterruptedException e) { + if (running) { + // throw unexpected interruption + throw e; + } + } + } + } catch (Throwable t) { + LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t); + consumer.fetcher.stopWithError(t); + } + } + + public void close() { + this.running = false; + this.interrupt(); + } + + } + + + // ------------------------------------------------------------------------ + // Kafka / ZooKeeper communication utilities + // ------------------------------------------------------------------------ + + /** + * Send request to Kafka to get partitions for topic. + * + * @param topics The name of the topics. + * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. + */ + public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(final List<String> topics, final Properties properties) { + String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES))); + + checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set"); + String[] seedBrokers = seedBrokersConfString.split(","); + List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); + + Random rnd = new Random(); + retryLoop: for (int retry = 0; retry < numRetries; retry++) { + // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the + // parallel source instances start. Still, we try all available brokers. + int index = rnd.nextInt(seedBrokers.length); + brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) { + String seedBroker = seedBrokers[index]; + LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries); + if (++index == seedBrokers.length) { + index = 0; + } + + URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker); + SimpleConsumer consumer = null; + try { + final String clientId = "flink-kafka-consumer-partition-lookup"; + final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000")); + final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536")); + consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId); + + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + List<TopicMetadata> metaData = resp.topicsMetadata(); + + // clear in case we have an incomplete list from previous tries + partitions.clear(); + for (TopicMetadata item : metaData) { + if (item.errorCode() != ErrorMapping.NoError()) { + if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) { + // fail hard if topic is unknown + throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode())); + } + // warn and try more brokers + LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " + + "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage()); + continue brokersLoop; + } + if (!topics.contains(item.topic())) { + LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ..."); + continue brokersLoop; + } + for (PartitionMetadata part : item.partitionsMetadata()) { + Node leader = brokerToNode(part.leader()); + KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId()); + KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader); + partitions.add(pInfo); + } + } + break retryLoop; // leave the loop through the brokers + } catch (Exception e) { + LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + ". Message: " + e.getMessage()); + LOG.debug("Detailed trace", e); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } // brokers loop + } // retries loop + return partitions; + } + + /** + * Turn a broker instance into a node instance + * @param broker broker instance + * @return Node representing the given broker + */ + private static Node brokerToNode(Broker broker) { + return new Node(broker.id(), broker.host(), broker.port()); + } + + /** + * Validate the ZK configuration, checking for required parameters + * @param props Properties to check + */ + protected static void validateZooKeeperConfig(Properties props) { + if (props.getProperty("zookeeper.connect") == null) { + throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties"); + } + if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { + throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG + + "' has not been set in the properties"); + } + + try { + //noinspection ResultOfMethodCallIgnored + Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0")); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer"); + } + + try { + //noinspection ResultOfMethodCallIgnored + Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0")); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java new file mode 100644 index 0000000..56ccd0b --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ +@Deprecated +public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> { + + private static final long serialVersionUID = -5649906773771949146L; + + /** + * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ + @Deprecated + public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + super(topic, valueDeserializer, props); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java new file mode 100644 index 0000000..0520336 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ +@Deprecated +public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> { + + private static final long serialVersionUID = -5649906773771949146L; + + /** + * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ + @Deprecated + public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + super(topic, valueDeserializer, props); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java new file mode 100644 index 0000000..1c2e0b7 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import java.util.Properties; + + +/** + * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead. + */ +@Deprecated +public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> { + + @Deprecated + public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + @Deprecated + public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + super(topicId, serializationSchema, producerConfig, null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java new file mode 100644 index 0000000..4975f9a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. + * + * Please note that this producer does not have any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { + + private static final long serialVersionUID = 1L; + + // ------------------- Keyless serialization schema constructors ---------------------- + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + */ + public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + */ + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java new file mode 100644 index 0000000..4f1a2a6 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; + +import java.io.IOException; +import java.util.HashMap; + +/** + * A fetcher pulls data from Kafka, from a fix set of partitions. + * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset. + */ +public interface Fetcher { + + /** + * Closes the fetcher. This will stop any operation in the + * {@link #run(SourceFunction.SourceContext, KeyedDeserializationSchema, HashMap)} method and eventually + * close underlying connections and release all resources. + */ + void close() throws IOException; + + /** + * Starts fetch data from Kafka and emitting it into the stream. + * + * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update + * of the last consumed offset in one atomic operation:</p> + * <pre>{@code + * + * while (running) { + * T next = ... + * long offset = ... + * int partition = ... + * synchronized (sourceContext.getCheckpointLock()) { + * sourceContext.collect(next); + * lastOffsets[partition] = offset; + * } + * } + * }</pre> + * + * @param <T> The type of elements produced by the fetcher and emitted to the source context. + * @param sourceContext The source context to emit elements to. + * @param valueDeserializer The deserializer to decode the raw values with. + * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state) + */ + <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer, + HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception; + + /** + * Set the next offset to read from for the given partition. + * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result + * will be the message with <i>offset=n</i>. + * + * @param topicPartition The partition for which to seek the offset. + * @param offsetToRead To offset to seek to. + */ + void seek(KafkaTopicPartition topicPartition, long offsetToRead); + + /** + * Exit run loop with given error and release all resources. + * + * @param t Error cause + */ + void stopWithError(Throwable t); +}
