http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java new file mode 100644 index 0000000..9faa249 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>Please refer to Kafka's documentation for the available configuration properties: + * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> + * + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer + * is constructed. That means that the client that submits the program needs to be able to + * reach the Kafka brokers or ZooKeeper.</p> + */ +public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { + + // ------------------------------------------------------------------------ + + private static final long serialVersionUID = 2324564345203409112L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class); + + /** Configuration key to change the polling timeout **/ + public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout"; + + /** Boolean configuration key to disable metrics tracking **/ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now. + */ + public static final long DEFAULT_POLL_TIMEOUT = 100L; + + /** User-supplied properties for Kafka **/ + private final Properties properties; + /** Ordered list of all partitions available in all subscribed partitions **/ + private final List<KafkaTopicPartition> partitionInfos; + + // ------ Runtime State ------- + + /** The partitions actually handled by this consumer at runtime */ + private transient List<TopicPartition> subscribedPartitions; + /** For performance reasons, we are keeping two representations of the subscribed partitions **/ + private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink; + /** The Kafka Consumer instance**/ + private transient KafkaConsumer<byte[], byte[]> consumer; + /** The thread running Kafka's consumer **/ + private transient ConsumerThread<T> consumerThread; + /** Exception set from the ConsumerThread */ + private transient Throwable consumerThreadException; + /** If the consumer doesn't have a Kafka partition assigned at runtime, it'll block on this waitThread **/ + private transient Thread waitThread; + + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(deserializer, props); + checkNotNull(topics, "topics"); + this.properties = checkNotNull(props, "props"); + setDeserializer(this.properties); + KafkaConsumer<byte[], byte[]> consumer = null; + try { + consumer = new KafkaConsumer<>(this.properties); + this.partitionInfos = new ArrayList<>(); + for (final String topic: topics) { + // get partitions for each topic + List<PartitionInfo> partitionsForTopic = null; + for(int tri = 0; tri < 10; tri++) { + LOG.info("Trying to get partitions for topic {}", topic); + try { + partitionsForTopic = consumer.partitionsFor(topic); + if(partitionsForTopic != null && partitionsForTopic.size() > 0) { + break; // it worked + } + } catch (NullPointerException npe) { + // workaround for KAFKA-2880: Fetcher.getTopicMetadata NullPointerException when broker cannot be reached + // we ignore the NPE. + } + // create a new consumer + consumer.close(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + consumer = new KafkaConsumer<>(properties); + } + // for non existing topics, the list might be null. + if(partitionsForTopic != null) { + partitionInfos.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); + } + } + } finally { + if(consumer != null) { + consumer.close(); + } + } + if(partitionInfos.isEmpty()) { + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); + } + + // we now have a list of partitions which is the same for all parallel consumer instances. + LOG.info("Got {} partitions from these topics: {}", partitionInfos.size(), topics); + + if (LOG.isInfoEnabled()) { + logPartitionInfo(partitionInfos); + } + } + + + /** + * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable) + * @param partitions A list of Kafka PartitionInfos. + * @return A list of KafkaTopicPartitions + */ + public static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { + checkNotNull(partitions, "The given list of partitions was null"); + List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); + for(PartitionInfo pi: partitions) { + ret.add(new KafkaTopicPartition(pi.topic(), pi.partition())); + } + return ret; + } + + public static List<TopicPartition> convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) { + List<TopicPartition> ret = new ArrayList<>(partitions.size()); + for(KafkaTopicPartition ktp: partitions) { + ret.add(new TopicPartition(ktp.getTopic(), ktp.getPartition())); + } + return ret; + } + + // ------------------------------------------------------------------------ + // Source life cycle + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks(); + final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); + + // pick which partitions we work on + this.subscribedPartitionsAsFlink = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); + if(this.subscribedPartitionsAsFlink.isEmpty()) { + LOG.info("This consumer doesn't have any partitions assigned"); + this.offsetsState = null; + return; + } else { + StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); + // if checkpointing is enabled, we are not automatically committing to Kafka. + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled())); + this.consumer = new KafkaConsumer<>(properties); + } + subscribedPartitions = convertToKafkaTopicPartition(subscribedPartitionsAsFlink); + + this.consumer.assign(this.subscribedPartitions); + + // register Kafka metrics to Flink accumulators + if(!Boolean.getBoolean(properties.getProperty(KEY_DISABLE_METRICS, "false"))) { + Map<MetricName, ? extends Metric> metrics = this.consumer.metrics(); + if(metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Consumer implementation does not support metrics"); + } else { + for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) { + String name = "consumer-" + metric.getKey().name(); + DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue()); + // best effort: we only add the accumulator if available. + if (kafkaAccumulator != null) { + getRuntimeContext().addAccumulator(name, kafkaAccumulator); + } + } + } + } + + // check if we need to explicitly seek to a specific offset (restore case) + if(restoreToOffset != null) { + // we are in a recovery scenario + for(Map.Entry<KafkaTopicPartition, Long> offset: restoreToOffset.entrySet()) { + // seek all offsets to the right position + this.consumer.seek(new TopicPartition(offset.getKey().getTopic(), offset.getKey().getPartition()), offset.getValue() + 1); + } + this.offsetsState = restoreToOffset; + } else { + this.offsetsState = new HashMap<>(); + } + } + + + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + if(consumer != null) { + consumerThread = new ConsumerThread<>(this, sourceContext); + consumerThread.start(); + // wait for the consumer to stop + while(consumerThread.isAlive()) { + if(consumerThreadException != null) { + throw new RuntimeException("ConsumerThread threw an exception", consumerThreadException); + } + try { + consumerThread.join(50); + } catch (InterruptedException ie) { + consumerThread.shutdown(); + } + } + // check again for an exception + if(consumerThreadException != null) { + throw new RuntimeException("ConsumerThread threw an exception", consumerThreadException); + } + } else { + // this source never completes, so emit a Long.MAX_VALUE watermark + // to not block watermark forwarding + if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) { + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + + final Object waitLock = new Object(); + this.waitThread = Thread.currentThread(); + while (running) { + // wait until we are canceled + try { + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + waitLock.wait(); + } + } + catch (InterruptedException e) { + // do nothing, check our "running" status + } + } + } + // close the context after the work was done. this can actually only + // happen when the fetcher decides to stop fetching + sourceContext.close(); + } + + @Override + public void cancel() { + // set ourselves as not running + running = false; + if(this.consumerThread != null) { + this.consumerThread.shutdown(); + } else { + // the consumer thread is not running, so we have to interrupt our own thread + if(waitThread != null) { + waitThread.interrupt(); + } + } + } + + @Override + public void close() throws Exception { + cancel(); + super.close(); + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + + @Override + protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) { + Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets); + synchronized (this.consumer) { + this.consumer.commitSync(kafkaCheckpointOffsets); + } + } + + public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) { + Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(checkpointOffsets.size()); + for(Map.Entry<KafkaTopicPartition, Long> partitionOffset: checkpointOffsets.entrySet()) { + ret.put(new TopicPartition(partitionOffset.getKey().getTopic(), partitionOffset.getKey().getPartition()), + new OffsetAndMetadata(partitionOffset.getValue(), "")); + } + return ret; + } + + // ------------------------------------------------------------------------ + // Miscellaneous utilities + // ------------------------------------------------------------------------ + + + protected static void setDeserializer(Properties props) { + if (!props.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + + if (!props.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + } + } + + /** + * We use a separate thread for executing the KafkaConsumer.poll(timeout) call because Kafka is not + * handling interrupts properly. On an interrupt (which happens automatically by Flink if the task + * doesn't react to cancel() calls), the poll() method might never return. + * On cancel, we'll wakeup the .poll() call and wait for it to return + */ + private static class ConsumerThread<T> extends Thread { + private final FlinkKafkaConsumer09<T> flinkKafkaConsumer; + private final SourceContext<T> sourceContext; + private boolean running = true; + + public ConsumerThread(FlinkKafkaConsumer09<T> flinkKafkaConsumer, SourceContext<T> sourceContext) { + this.flinkKafkaConsumer = flinkKafkaConsumer; + this.sourceContext = sourceContext; + } + + @Override + public void run() { + try { + long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT))); + pollLoop: while (running) { + ConsumerRecords<byte[], byte[]> records; + //noinspection SynchronizeOnNonFinalField + synchronized (flinkKafkaConsumer.consumer) { + try { + records = flinkKafkaConsumer.consumer.poll(pollTimeout); + } catch (WakeupException we) { + if (running) { + throw we; + } + // leave loop + continue; + } + } + // get the records for each topic partition + for (int i = 0; i < flinkKafkaConsumer.subscribedPartitions.size(); i++) { + TopicPartition partition = flinkKafkaConsumer.subscribedPartitions.get(i); + KafkaTopicPartition flinkPartition = flinkKafkaConsumer.subscribedPartitionsAsFlink.get(i); + List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition); + //noinspection ForLoopReplaceableByForEach + for (int j = 0; j < partitionRecords.size(); j++) { + ConsumerRecord<byte[], byte[]> record = partitionRecords.get(j); + T value = flinkKafkaConsumer.deserializer.deserialize(record.key(), record.value(), record.topic(), record.partition(),record.offset()); + if(flinkKafkaConsumer.deserializer.isEndOfStream(value)) { + // end of stream signaled + running = false; + break pollLoop; + } + synchronized (sourceContext.getCheckpointLock()) { + sourceContext.collect(value); + flinkKafkaConsumer.offsetsState.put(flinkPartition, record.offset()); + } + } + } + } + } catch(Throwable t) { + if(running) { + this.flinkKafkaConsumer.stopWithError(t); + } else { + LOG.debug("Stopped ConsumerThread threw exception", t); + } + } finally { + try { + flinkKafkaConsumer.consumer.close(); + } catch(Throwable t) { + LOG.warn("Error while closing consumer", t); + } + } + } + + /** + * Try to shutdown the thread + */ + public void shutdown() { + this.running = false; + this.flinkKafkaConsumer.consumer.wakeup(); + } + } + + private void stopWithError(Throwable t) { + this.consumerThreadException = t; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java new file mode 100644 index 0000000..6f7f687 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. + * + * Please note that this producer does not have any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { + + private static final long serialVersionUID = 1L; + + // ------------------- Keyless serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java new file mode 100644 index 0000000..643da66 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + + +/** + * Read Strings from Kafka and print them to standard out. + * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! + * + * Please pass the following arguments to run the example: + * --topic test --bootstrap.servers localhost:9092 --group.id myconsumer + * + */ +public class ReadFromKafka { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(4); + env.enableCheckpointing(5000); + env.setParallelism(2); + + ParameterTool parameterTool = ParameterTool.fromArgs(args); + + DataStream<String> messageStream = env + .addSource(new FlinkKafkaConsumer09<>( + parameterTool.getRequired("topic"), + new SimpleStringSchema(), + parameterTool.getProperties())); + + messageStream.print(); + + env.execute("Read from Kafka example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java new file mode 100644 index 0000000..fbe53fa --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + + +/** + * Generate a String every 500 ms and write it into a Kafka topic + * + * Please pass the following arguments to run the example: + * --topic test --bootstrap.servers localhost:9092 + * + */ +public class WriteIntoKafka { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(4); + env.setParallelism(2); + + ParameterTool parameterTool = ParameterTool.fromArgs(args); + + // very simple data generator + DataStream<String> messageStream = env.addSource(new SourceFunction<String>() { + public boolean running = true; + + @Override + public void run(SourceContext<String> ctx) throws Exception { + long i = 0; + while(this.running) { + ctx.collect("Element - " + i++); + Thread.sleep(500); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + // write data into Kafka + messageStream.addSink(new FlinkKafkaProducer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); + + env.execute("Write into Kafka example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java new file mode 100644 index 0000000..55abaaa --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Test; + + +public class Kafka09ITCase extends KafkaConsumerTestBase { + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Test(timeout = 60000) + public void testCheckpointing() throws Exception { + runCheckpointingTest(); + } + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout = 60000) + public void testMetricsAndEndOfStream() throws Exception { + runMetricsAndEndOfStreamTest(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java new file mode 100644 index 0000000..1288347 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.junit.Test; + + +@SuppressWarnings("serial") +public class Kafka09ProducerITCase extends KafkaProducerTestBase { + + @Test + public void testCustomPartitioning() { + runCustomPartitioningTest(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..a2c4f73 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.TestLogger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(FlinkKafkaProducerBase.class) +public class KafkaProducerTest extends TestLogger { + + @Test + @SuppressWarnings("unchecked") + public void testPropagateExceptions() { + try { + // mock kafka producer + KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); + + // partition setup + when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( + Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null))); + + // failure when trying to send an element + when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) + .thenAnswer(new Answer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { + Callback callback = (Callback) invocation.getArguments()[1]; + callback.onCompletion(null, new Exception("Test error")); + return null; + } + }); + + // make sure the FlinkKafkaProducer instantiates our mock producer + whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); + + // (1) producer that propagates errors + + FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>( + "mock_topic", new SimpleStringSchema(), new Properties(), null); + + producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3)); + producerPropagating.open(new Configuration()); + + try { + producerPropagating.invoke("value"); + producerPropagating.invoke("value"); + fail("This should fail with an exception"); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertNotNull(e.getCause().getMessage()); + assertTrue(e.getCause().getMessage().contains("Test error")); + } + + // (2) producer that only logs errors + + FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>( + "mock_topic", new SimpleStringSchema(), new Properties(), null); + producerLogging.setLogFailuresOnly(true); + + producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3)); + producerLogging.open(new Configuration()); + + producerLogging.invoke("value"); + producerLogging.invoke("value"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..0855ba6 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.admin.AdminUtils; +import kafka.common.KafkaException; +import kafka.consumer.ConsumerConfig; +import kafka.api.PartitionMetadata; +import kafka.network.SocketServer; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.SystemTime$; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.9 + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private ConsumerConfig standardCC; + + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + @Override + public ConsumerConfig getStandardConsumerConfig() { + return standardCC; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public String getVersion() { + return "0.9"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer09<>(topics, readSchema, props); + } + + @Override + public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + return new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId), KAFKA_HOST, zookeeperConnectionString)); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkUtils zkUtils = getZkUtils(); + try { + PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata(); + firstPart = partitionMetadata.head(); + } + while (firstPart.errorCode() != 0); + + return firstPart.leader().get().id(); + } finally { + zkUtils.close(); + } + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.config().brokerId(); + } + + @Override + public void prepare(int numKafkaServers) { + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(numKafkaServers); + for (int i = 0; i < numKafkaServers; i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + LOG.info("Starting Zookeeper"); + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(numKafkaServers); + + for (int i = 0; i < numKafkaServers; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), KafkaTestEnvironment.KAFKA_HOST, zookeeperConnectionString)); + + SocketServer socketServer = brokers.get(i).socketServer(); + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("auto.commit.enable", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. + standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. + standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + + Properties consumerConfigProps = new Properties(); + consumerConfigProps.putAll(standardProps); + consumerConfigProps.setProperty("auto.offset.reset", "smallest"); + standardCC = new ConsumerConfig(consumerConfigProps); + } + + @Override + public void shutdown() { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + public ZkUtils getZkUtils() { + ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), + standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + return ZkUtils.apply(creator, false); + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { + // create topic with one client + Properties topicConfig = new Properties(); + LOG.info("Creating topic {}", topic); + + ZkUtils zkUtils = getZkUtils(); + try { + AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig); + } finally { + zkUtils.close(); + } + + // validate that the topic has been created + final long deadline = System.currentTimeMillis() + 30000; + do { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // restore interrupted state + } + // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are + // not always correct. + + // create a new ZK utils connection + ZkUtils checkZKConn = getZkUtils(); + if(AdminUtils.topicExists(checkZKConn, topic)) { + checkZKConn.close(); + return; + } + checkZKConn.close(); + } + while (System.currentTimeMillis() < deadline); + fail("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + ZkUtils zkUtils = getZkUtils(); + try { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), + standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + + AdminUtils.deleteTopic(zkUtils, topic); + + zk.close(); + } finally { + zkUtils.close(); + } + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder, + String kafkaHost, + String zookeeperConnectionString) throws Exception { + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", kafkaHost); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", "20000"); + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + scala.Option<String> stringNone = scala.Option.apply(null); + KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml new file mode 100644 index 0000000..354d353 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml @@ -0,0 +1,169 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-connectors-parent</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-base</artifactId> + <name>flink-connector-kafka-base</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.8.2.0</kafka.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-annotation</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- force using the latest zkclient --> + <dependency> + <groupId>com.101tec</groupId> + <artifactId>zkclient</artifactId> + <version>0.7</version> + <type>jar</type> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.101tec</groupId> + <artifactId>zkclient</artifactId> + <version>0.7</version> + </dependency> + </dependencies> + </dependencyManagement> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java new file mode 100644 index 0000000..3c36586 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + + +public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> + implements CheckpointNotifier, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T> { + + // ------------------------------------------------------------------------ + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class); + + private static final long serialVersionUID = -6272159445203409112L; + + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + + /** The schema to convert between Kafka#s byte messages, and Flink's objects */ + protected final KeyedDeserializationSchema<T> deserializer; + + // ------ Runtime State ------- + + /** Data for pending but uncommitted checkpoints */ + protected final LinkedMap pendingCheckpoints = new LinkedMap(); + + /** The offsets of the last returned elements */ + protected transient HashMap<KafkaTopicPartition, Long> offsetsState; + + /** The offsets to restore to, if the consumer restores state from a checkpoint */ + protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset; + + /** Flag indicating whether the consumer is still running **/ + protected volatile boolean running = true; + + // ------------------------------------------------------------------------ + + + /** + * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler. + * + * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs + * at the beginnign of this class.</p> + * + * @param deserializer + * The deserializer to turn raw byte messages into Java/Scala objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer, Properties props) { + this.deserializer = checkNotNull(deserializer, "valueDeserializer"); + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + @Override + public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + if (offsetsState == null) { + LOG.debug("snapshotState() requested on not yet opened source; returning null."); + return null; + } + if (!running) { + LOG.debug("snapshotState() called on closed source"); + return null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", + KafkaTopicPartition.toString(offsetsState), checkpointId, checkpointTimestamp); + } + + // the use of clone() is okay here is okay, we just need a new map, the keys are not changed + //noinspection unchecked + HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone(); + + // the map cannot be asynchronously updated, because only one checkpoint call can happen + // on this function at a time: either snapshotState() or notifyCheckpointComplete() + pendingCheckpoints.put(checkpointId, currentOffsets); + + while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingCheckpoints.remove(0); + } + + return currentOffsets; + } + + @Override + public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) { + LOG.info("Setting restore state in Kafka"); + restoreToOffset = restoredOffsets; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (offsetsState == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } + if (!running) { + LOG.debug("notifyCheckpointComplete() called on closed source"); + return; + } + + // only one commit operation must be in progress + if (LOG.isDebugEnabled()) { + LOG.debug("Committing offsets externally for checkpoint {}", checkpointId); + } + + try { + HashMap<KafkaTopicPartition, Long> checkpointOffsets; + + // the map may be asynchronously updates when snapshotting state, so we synchronize + synchronized (pendingCheckpoints) { + final int posInMap = pendingCheckpoints.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); + return; + } + + //noinspection unchecked + checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap); + + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingCheckpoints.remove(0); + } + } + if (checkpointOffsets == null || checkpointOffsets.size() == 0) { + LOG.debug("Checkpoint state was empty."); + return; + } + commitOffsets(checkpointOffsets); + } + catch (Exception e) { + if (running) { + throw e; + } + // else ignore exception if we are no longer running + } + } + + protected abstract void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) throws Exception; + + + @Override + public TypeInformation<T> getProducedType() { + return deserializer.getProducedType(); + } + + protected static <T> List<T> assignPartitions(List<T> partitions, int numConsumers, int consumerIndex) { + checkArgument(numConsumers > 0); + checkArgument(consumerIndex < numConsumers); + + List<T> partitionsToSub = new ArrayList<>(); + + for (int i = 0; i < partitions.size(); i++) { + if (i % numConsumers == consumerIndex) { + partitionsToSub.add(partitions.get(i)); + } + } + return partitionsToSub; + } + + /** + * Method to log partition information. + * @param partitionInfos List of subscribed partitions + */ + public static void logPartitionInfo(List<KafkaTopicPartition> partitionInfos) { + Map<String, Integer> countPerTopic = new HashMap<>(); + for (KafkaTopicPartition partition : partitionInfos) { + Integer count = countPerTopic.get(partition.getTopic()); + if (count == null) { + count = 1; + } else { + count++; + } + countPerTopic.put(partition.getTopic(), count); + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) { + sb.append(e.getKey()).append(" (").append(e.getValue()).append("), "); + } + LOG.info("Consumer is going to read the following topics (with number of partitions): ", sb.toString()); + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java new file mode 100644 index 0000000..ebc02c9 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import com.google.common.base.Preconditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +/** + * Flink Sink to produce data into a Kafka topic. + * + * Please note that this producer does not have any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); + + private static final long serialVersionUID = 1L; + + /** + * Configuration key for disabling the metrics reporting + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Array with the partition ids of the given topicId + * The size of this array is the number of partitions + */ + protected final int[] partitions; + + /** + * User defined properties for the Producer + */ + protected final Properties producerConfig; + + /** + * The name of the topic this producer is writing data to + */ + protected final String topicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into + * byte[] for Kafka. + */ + protected final KeyedSerializationSchema<IN> schema; + + /** + * User-provided partitioner for assigning an object to a Kafka partition. + */ + protected final KafkaPartitioner<IN> partitioner; + + /** + * Flag indicating whether to accept failures (and log them), or to fail on failures + */ + protected boolean logFailuresOnly; + + // -------------------------------- Runtime fields ------------------------------------------ + + /** KafkaProducer instance */ + protected transient KafkaProducer<byte[], byte[]> producer; + + /** The callback than handles error propagation or logging callbacks */ + protected transient Callback callback; + + /** Errors encountered in the async producer are stored here */ + protected transient volatile Exception asyncException; + + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner + */ + public FlinkKafkaProducerBase(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + Preconditions.checkNotNull(topicId, "TopicID not set"); + Preconditions.checkNotNull(serializationSchema, "serializationSchema not set"); + Preconditions.checkNotNull(producerConfig, "producerConfig not set"); + ClosureCleaner.ensureSerializable(customPartitioner); + ClosureCleaner.ensureSerializable(serializationSchema); + + this.topicId = topicId; + this.schema = serializationSchema; + this.producerConfig = producerConfig; + + // set the producer configuration properties. + + if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + + // create a local KafkaProducer to get the list of partitions. + // this will also ensure locally that all required ProducerConfig values are set. + try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) { + List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId); + + this.partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + getPartitionsProd.close(); + } + + this.partitioner = customPartitioner; + } + + // ---------------------------------- Properties -------------------------- + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + // ----------------------------------- Utilities -------------------------- + + /** + * Initializes the connection to Kafka. + */ + @Override + public void open(Configuration configuration) { + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig); + + RuntimeContext ctx = getRuntimeContext(); + if(partitioner != null) { + partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); + } + + LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", + ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId); + + // register Kafka metrics to Flink accumulators + if(!Boolean.getBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + Map<MetricName, ? extends Metric> metrics = this.producer.metrics(); + + if(metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Producer implementation does not support metrics"); + } else { + for(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { + String name = "producer-" + metric.getKey().name(); + DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue()); + // best effort: we only add the accumulator if available. + if(kafkaAccumulator != null) { + getRuntimeContext().addAccumulator(name, kafkaAccumulator); + } + } + } + } + + if (logFailuresOnly) { + callback = new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); + } + } + }; + } + else { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null && asyncException == null) { + asyncException = exception; + } + } + }; + } + } + + /** + * Called when new data arrives to the sink, and forwards it to Kafka. + * + * @param next + * The incoming data + */ + @Override + public void invoke(IN next) throws Exception { + // propagate asynchronous errors + checkErroneous(); + + byte[] serializedKey = schema.serializeKey(next); + byte[] serializedValue = schema.serializeValue(next); + ProducerRecord<byte[], byte[]> record; + if(partitioner == null) { + record = new ProducerRecord<>(topicId, serializedKey, serializedValue); + } else { + record = new ProducerRecord<>(topicId, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); + } + + producer.send(record, callback); + } + + + @Override + public void close() throws Exception { + if (producer != null) { + producer.close(); + } + + // make sure we propagate pending errors + checkErroneous(); + } + + + // ----------------------------------- Utilities -------------------------- + + protected void checkErroneous() throws Exception { + Exception e = asyncException; + if (e != null) { + // prevent double throwing + asyncException = null; + throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); + } + } + + public static Properties getPropertiesFromBrokerList(String brokerList) { + String[] elements = brokerList.split(","); + + // validate the broker addresses + for (String broker: elements) { + NetUtils.getCorrectHostnamePort(broker); + } + + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + return props; + } +}
