[FLINK-4035] Add support for Kafka 0.10.x. This closes #2231
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63859c64 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63859c64 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63859c64 Branch: refs/heads/master Commit: 63859c648a2a0ff024228f6e0687d837b8896322 Parents: a079259 Author: radekg <ra...@gruchalski.com> Authored: Tue Jul 12 13:19:01 2016 -0400 Committer: Robert Metzger <rmetz...@apache.org> Committed: Tue Oct 11 10:04:25 2016 +0200 ---------------------------------------------------------------------- .../flink-connector-kafka-0.10/pom.xml | 179 ++++++++++ .../connectors/kafka/FlinkKafkaConsumer010.java | 259 +++++++++++++++ .../connectors/kafka/FlinkKafkaProducer010.java | 137 ++++++++ .../kafka/Kafka010JsonTableSource.java | 71 ++++ .../connectors/kafka/Kafka010TableSource.java | 75 +++++ .../kafka/internal/Kafka010Fetcher.java | 312 +++++++++++++++++ .../src/main/resources/log4j.properties | 29 ++ .../connectors/kafka/Kafka010ITCase.java | 192 +++++++++++ .../kafka/Kafka010ProducerITCase.java | 33 ++ .../connectors/kafka/KafkaProducerTest.java | 119 +++++++ .../kafka/KafkaShortRetention010ITCase.java | 34 ++ .../kafka/KafkaTestEnvironmentImpl.java | 331 +++++++++++++++++++ .../src/test/resources/log4j-test.properties | 30 ++ .../src/test/resources/logback-test.xml | 30 ++ flink-streaming-connectors/pom.xml | 1 + 15 files changed, 1832 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml new file mode 100644 index 0000000..f2bcb11 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml @@ -0,0 +1,179 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-connectors</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-0.10_2.10</artifactId> + <name>flink-connector-kafka-0.10</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.10.0.0</kafka.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, + won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude 0.8 dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <!-- include 0.10 server for tests --> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-test-sources</id> + <goals> + <goal>test-jar-no-fork</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java new file mode 100644 index 0000000..78ccd4a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>Please refer to Kafka's documentation for the available configuration properties: + * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> + * + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer + * is constructed. That means that the client that submits the program needs to be able to + * reach the Kafka brokers or ZooKeeper.</p> + */ +public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> { + + private static final long serialVersionUID = 2324564345203409112L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class); + + /** Configuration key to change the polling timeout **/ + public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout"; + + /** Boolean configuration key to disable metrics tracking **/ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now. */ + public static final long DEFAULT_POLL_TIMEOUT = 100L; + + // ------------------------------------------------------------------------ + + /** User-supplied properties for Kafka **/ + private final Properties properties; + + /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now */ + private final long pollTimeout; + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(deserializer); + + checkNotNull(topics, "topics"); + this.properties = checkNotNull(props, "props"); + setDeserializer(this.properties); + + // configure the polling timeout + try { + if (properties.containsKey(KEY_POLL_TIMEOUT)) { + this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); + } else { + this.pollTimeout = DEFAULT_POLL_TIMEOUT; + } + } + catch (Exception e) { + throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); + } + + // read the partitions that belong to the listed topics + final List<KafkaTopicPartition> partitions = new ArrayList<>(); + + try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) { + for (final String topic: topics) { + // get partitions for each topic + List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic); + // for non existing topics, the list might be null. + if (partitionsForTopic != null) { + partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); + } + } + } + + if (partitions.isEmpty()) { + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); + } + + // we now have a list of partitions which is the same for all parallel consumer instances. + LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics); + + if (LOG.isInfoEnabled()) { + logPartitionInfo(LOG, partitions); + } + + // register these partitions + setSubscribedPartitions(partitions); + } + + @Override + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + + boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); + + return new Kafka010Fetcher<>(sourceContext, thisSubtaskPartitions, + watermarksPeriodic, watermarksPunctuated, + runtimeContext, deserializer, + properties, pollTimeout, useMetrics); + + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable) + * + * @param partitions A list of Kafka PartitionInfos. + * @return A list of KafkaTopicPartitions + */ + private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { + checkNotNull(partitions); + + List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); + for (PartitionInfo pi : partitions) { + ret.add(new KafkaTopicPartition(pi.topic(), pi.partition())); + } + return ret; + } + + /** + * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. + * + * @param props The Kafka properties to register the serializer in. + */ + private static void setDeserializer(Properties props) { + final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); + + Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + + if (keyDeSer != null && !keyDeSer.equals(deSerName)) { + LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + if (valDeSer != null && !valDeSer.equals(deSerName)) { + LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + } + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java new file mode 100644 index 0000000..49bce39 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. + * + * Please note that this producer does not have any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { + + private static final long serialVersionUID = 1L; + + // ------------------- Keyless serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + + @Override + protected void flush() { + if (this.producer != null) { + producer.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java new file mode 100644 index 0000000..cda68ce --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.10. + */ +public class Kafka010JsonTableSource extends KafkaJsonTableSource { + + /** + * Creates a Kafka 0.10 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.10 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java new file mode 100644 index 0000000..cee1b90 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.10. + */ +public class Kafka010TableSource extends KafkaTableSource { + + /** + * Creates a Kafka 0.10 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.10 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java new file mode 100644 index 0000000..70f530b --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API. + * + * @param <T> The type of elements produced by the fetcher. + */ +public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(Kafka010Fetcher.class); + + // ------------------------------------------------------------------------ + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ + private final KeyedDeserializationSchema<T> deserializer; + + /** The subtask's runtime context */ + private final RuntimeContext runtimeContext; + + /** The configuration for the Kafka consumer */ + private final Properties kafkaProperties; + + /** The maximum number of milliseconds to wait for a fetch batch */ + private final long pollTimeout; + + /** Flag whether to register Kafka metrics as Flink accumulators */ + private final boolean forwardKafkaMetrics; + + /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */ + private final Object consumerLock = new Object(); + + /** Reference to the Kafka consumer, once it is created */ + private volatile KafkaConsumer<byte[], byte[]> consumer; + + /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */ + private volatile ExceptionProxy errorHandler; + + /** Flag to mark the main work loop as alive */ + private volatile boolean running = true; + + // ------------------------------------------------------------------------ + + public Kafka010Fetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext, + KeyedDeserializationSchema<T> deserializer, + Properties kafkaProperties, + long pollTimeout, + boolean forwardKafkaMetrics) throws Exception + { + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext); + + this.deserializer = deserializer; + this.runtimeContext = runtimeContext; + this.kafkaProperties = kafkaProperties; + this.pollTimeout = pollTimeout; + this.forwardKafkaMetrics = forwardKafkaMetrics; + + // if checkpointing is enabled, we are not automatically committing to Kafka. + kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + Boolean.toString(!runtimeContext.isCheckpointingEnabled())); + } + + // ------------------------------------------------------------------------ + // Fetcher work methods + // ------------------------------------------------------------------------ + + @Override + public void runFetchLoop() throws Exception { + this.errorHandler = new ExceptionProxy(Thread.currentThread()); + + // rather than running the main fetch loop directly here, we spawn a dedicated thread + // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code + Thread runner = new Thread(this, "Kafka 0.10 Fetcher for " + runtimeContext.getTaskNameWithSubtasks()); + runner.setDaemon(true); + runner.start(); + + try { + runner.join(); + } catch (InterruptedException e) { + // may be the result of a wake-up after an exception. we ignore this here and only + // restore the interruption state + Thread.currentThread().interrupt(); + } + + // make sure we propagate any exception that occurred in the concurrent fetch thread, + // before leaving this method + this.errorHandler.checkAndThrowException(); + } + + @Override + public void cancel() { + // flag the main thread to exit + running = false; + + // NOTE: + // - We cannot interrupt the runner thread, because the Kafka consumer may + // deadlock when the thread is interrupted while in certain methods + // - We cannot call close() on the consumer, because it will actually throw + // an exception if a concurrent call is in progress + + // make sure the consumer finds out faster that we are shutting down + if (consumer != null) { + consumer.wakeup(); + } + } + + @Override + public void run() { + // This method initializes the KafkaConsumer and guarantees it is torn down properly. + // This is important, because the consumer has multi-threading issues, + // including concurrent 'close()' calls. + + final KafkaConsumer<byte[], byte[]> consumer; + try { + consumer = new KafkaConsumer<>(kafkaProperties); + } + catch (Throwable t) { + running = false; + errorHandler.reportError(t); + return; + } + + // from here on, the consumer will be closed properly + try { + consumer.assign(convertKafkaPartitions(subscribedPartitions())); + + // register Kafka metrics to Flink accumulators + if (forwardKafkaMetrics) { + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Consumer implementation does not support metrics"); + } else { + // we have metrics, register them where possible + for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) { + String name = "KafkaConsumer-" + metric.getKey().name(); + DefaultKafkaMetricAccumulator kafkaAccumulator = + DefaultKafkaMetricAccumulator.createFor(metric.getValue()); + + // best effort: we only add the accumulator if available. + if (kafkaAccumulator != null) { + runtimeContext.addAccumulator(name, kafkaAccumulator); + } + } + } + } + + // seek the consumer to the initial offsets + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + if (partition.isOffsetDefined()) { + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); + } + } + + // from now on, external operations may call the consumer + this.consumer = consumer; + + // main fetch loop + while (running) { + // get the next batch of records + final ConsumerRecords<byte[], byte[]> records; + synchronized (consumerLock) { + try { + records = consumer.poll(pollTimeout); + } + catch (WakeupException we) { + if (running) { + throw we; + } else { + continue; + } + } + } + + // get the records for each topic partition + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + + List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); + + for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { + T value = deserializer.deserialize( + record.key(), record.value(), + record.topic(), record.partition(), record.offset()); + + if (deserializer.isEndOfStream(value)) { + // end of stream signaled + running = false; + break; + } + + // emit the actual record. this also update offset state atomically + // and deals with timestamps and watermark generation + emitRecord(value, partition, record.offset()); + } + } + } + // end main fetch loop + } + catch (Throwable t) { + if (running) { + running = false; + errorHandler.reportError(t); + } else { + LOG.debug("Stopped ConsumerThread threw exception", t); + } + } + finally { + try { + synchronized (consumerLock) { + consumer.close(); + } + } catch (Throwable t) { + LOG.warn("Error while closing Kafka 0.10 consumer", t); + } + } + } + + // ------------------------------------------------------------------------ + // Kafka 0.10 specific fetcher behavior + // ------------------------------------------------------------------------ + + @Override + public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new TopicPartition(partition.getTopic(), partition.getPartition()); + } + + @Override + public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); + + for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { + Long offset = offsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, "")); + } + } + + if (this.consumer != null) { + synchronized (consumerLock) { + this.consumer.commitSync(offsetsToCommit); + } + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + public static Collection<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) { + ArrayList<TopicPartition> result = new ArrayList<>(partitions.length); + for (KafkaTopicPartitionState<TopicPartition> p : partitions) { + result.add(p.getKafkaPartitionHandle()); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java new file mode 100644 index 0000000..5427853 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.junit.Test; + +import java.util.UUID; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + +public class Kafka010ITCase extends KafkaConsumerTestBase { + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Override + public String getExpectedKafkaVersion() { + return "0.10"; + } + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumer() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(false); +// } + +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(true); +// } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout = 60000) + public void testMetricsAndEndOfStream() throws Exception { + runMetricsAndEndOfStreamTest(); + } + + @Test + public void testJsonTableSource() throws Exception { + String topic = UUID.randomUUID().toString(); + + // Names and types are determined in the actual test method of the + // base test class. + Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource( + topic, + standardProps, + new String[] { + "long", + "string", + "boolean", + "double", + "missing-field"}, + new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }); + + // Don't fail on missing field, but set to null (default) + tableSource.setFailOnMissingField(false); + + runJsonTableSource(topic, tableSource); + } + + @Test + public void testJsonTableSourceWithFailOnMissingField() throws Exception { + String topic = UUID.randomUUID().toString(); + + // Names and types are determined in the actual test method of the + // base test class. + Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource( + topic, + standardProps, + new String[] { + "long", + "string", + "boolean", + "double", + "missing-field"}, + new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }); + + // Don't fail on missing field, but set to null (default) + tableSource.setFailOnMissingField(true); + + try { + runJsonTableSource(topic, tableSource); + fail("Did not throw expected Exception"); + } catch (Exception e) { + Throwable rootCause = e.getCause().getCause().getCause(); + assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java new file mode 100644 index 0000000..42b9682 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.junit.Test; + + +@SuppressWarnings("serial") +public class Kafka010ProducerITCase extends KafkaProducerTestBase { + + @Test + public void testCustomPartitioning() { + runCustomPartitioningTest(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..5f5ac63 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(FlinkKafkaProducerBase.class) +public class KafkaProducerTest extends TestLogger { + + @Test + @SuppressWarnings("unchecked") + public void testPropagateExceptions() { + try { + // mock kafka producer + KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); + + // partition setup + when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( + Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); + + // failure when trying to send an element + when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) + .thenAnswer(new Answer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { + Callback callback = (Callback) invocation.getArguments()[1]; + callback.onCompletion(null, new Exception("Test error")); + return null; + } + }); + + // make sure the FlinkKafkaProducer instantiates our mock producer + whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); + + // (1) producer that propagates errors + + FlinkKafkaProducer010<String> producerPropagating = new FlinkKafkaProducer010<>( + "mock_topic", new SimpleStringSchema(), new Properties(), null); + + producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3)); + producerPropagating.open(new Configuration()); + + try { + producerPropagating.invoke("value"); + producerPropagating.invoke("value"); + fail("This should fail with an exception"); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertNotNull(e.getCause().getMessage()); + assertTrue(e.getCause().getMessage().contains("Test error")); + } + + // (2) producer that only logs errors + + FlinkKafkaProducer010<String> producerLogging = new FlinkKafkaProducer010<>( + "mock_topic", new SimpleStringSchema(), new Properties(), null); + producerLogging.setLogFailuresOnly(true); + + producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3)); + producerLogging.open(new Configuration()); + + producerLogging.invoke("value"); + producerLogging.invoke("value"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java new file mode 100644 index 0000000..1d36198 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Test; + +@SuppressWarnings("serial") +public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase { + + @Test(timeout=60000) + public void testAutoOffsetReset() throws Exception { + runAutoOffsetResetTest(); + } + + @Test(timeout=60000) + public void testAutoOffsetResetNone() throws Exception { + runFailOnAutoOffsetResetNone(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..45f0478 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.admin.AdminUtils; +import kafka.common.KafkaException; +import kafka.network.SocketServer; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.SystemTime$; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.requests.MetadataResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.10 + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private Properties additionalServerProperties; + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public String getVersion() { + return "0.10"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer010<>(topics, readSchema, props); + } + + @Override + public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return prod; + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkUtils zkUtils = getZkUtils(); + try { + MetadataResponse.PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.error().code()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata(); + firstPart = partitionMetadata.get(0); + } + while (firstPart.error().code() != 0); + + return firstPart.leader().id(); + } finally { + zkUtils.close(); + } + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.config().brokerId(); + } + + @Override + public void prepare(int numKafkaServers, Properties additionalServerProperties) { + this.additionalServerProperties = additionalServerProperties; + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(numKafkaServers); + for (int i = 0; i < numKafkaServers; i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + LOG.info("Starting Zookeeper"); + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(numKafkaServers); + + for (int i = 0; i < numKafkaServers; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); + + SocketServer socketServer = brokers.get(i).socketServer(); + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("auto.commit.enable", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. + standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value) + standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } + + @Override + public void shutdown() { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + public ZkUtils getZkUtils() { + ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + return ZkUtils.apply(creator, false); + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { + // create topic with one client + LOG.info("Creating topic {}", topic); + + ZkUtils zkUtils = getZkUtils(); + try { + AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, new kafka.admin.RackAwareMode.Enforced$()); + } finally { + zkUtils.close(); + } + + // validate that the topic has been created + final long deadline = System.currentTimeMillis() + 30000; + do { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // restore interrupted state + } + // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are + // not always correct. + + // create a new ZK utils connection + ZkUtils checkZKConn = getZkUtils(); + if(AdminUtils.topicExists(checkZKConn, topic)) { + checkZKConn.close(); + return; + } + checkZKConn.close(); + } + while (System.currentTimeMillis() < deadline); + fail("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + ZkUtils zkUtils = getZkUtils(); + try { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + + AdminUtils.deleteTopic(zkUtils, topic); + + zk.close(); + } finally { + zkUtils.close(); + } + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); + kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); + if(additionalServerProperties != null) { + kafkaProperties.putAll(additionalServerProperties); + } + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + scala.Option<String> stringNone = scala.Option.apply(null); + KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..fbeb110 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/63859c64/flink-streaming-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml index 113109b..78e39ca 100644 --- a/flink-streaming-connectors/pom.xml +++ b/flink-streaming-connectors/pom.xml @@ -40,6 +40,7 @@ under the License. <module>flink-connector-kafka-base</module> <module>flink-connector-kafka-0.8</module> <module>flink-connector-kafka-0.9</module> + <module>flink-connector-kafka-0.10</module> <module>flink-connector-elasticsearch</module> <module>flink-connector-elasticsearch2</module> <module>flink-connector-rabbitmq</module>