[FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once semantic
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f651e9a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f651e9a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f651e9a Branch: refs/heads/master Commit: 2f651e9a69a9929ef154e7bf6fcba624b0e8b9a1 Parents: d20728b Author: Piotr Nowojski <[email protected]> Authored: Fri Jun 23 09:14:28 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 9 18:58:36 2017 +0200 ---------------------------------------------------------------------- docs/dev/connectors/kafka.md | 82 ++ .../kafka/Kafka010ProducerITCase.java | 9 + .../connectors/kafka/FlinkKafkaConsumer011.java | 113 ++ .../connectors/kafka/FlinkKafkaProducer011.java | 1039 ++++++++++++++++++ .../kafka/Kafka011AvroTableSource.java | 58 + .../kafka/Kafka011JsonTableSource.java | 53 + .../connectors/kafka/Kafka011TableSource.java | 55 + .../metrics/KafkaMetricMuttableWrapper.java | 43 + .../kafka/FlinkKafkaProducer011Tests.java | 366 ++++++ .../kafka/Kafka011AvroTableSourceTest.java | 54 + .../connectors/kafka/Kafka011ITCase.java | 353 ++++++ .../kafka/Kafka011JsonTableSourceTest.java | 49 + .../Kafka011ProducerAtLeastOnceITCase.java | 44 + .../Kafka011ProducerExactlyOnceITCase.java | 51 + .../kafka/KafkaTestEnvironmentImpl.java | 497 +++++++++ .../connectors/kafka/Kafka08ProducerITCase.java | 9 + .../connectors/kafka/Kafka09ProducerITCase.java | 10 + .../connectors/kafka/KafkaConsumerTestBase.java | 2 +- .../connectors/kafka/KafkaProducerTestBase.java | 100 +- .../connectors/kafka/KafkaTestBase.java | 84 ++ .../kafka/testutils/IntegerSource.java | 130 +++ 21 files changed, 3170 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index f95c8c0..aabb1ba 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -72,6 +72,14 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is <td>0.10.x</td> <td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td> </tr> + <tr> + <td>flink-connector-kafka-0.11_2.11</td> + <td>1.4.0</td> + <td>FlinkKafkaConsumer011<br> + FlinkKafkaProducer011</td> + <td>0.11.x</td> + <td>Since 0.11.x Kafka does not support scala 2.10. This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">Kafka transactional messaging</a> to provide exactly once semantic for the producer.</td> + </tr> </tbody> </table> @@ -518,6 +526,80 @@ into a Kafka topic. for more explanation. </div> +#### Kafka 0.11 + +With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide +exactly-once delivery guarantees. + +Besides enabling Flink's checkpointing, you can also choose three different modes of operating +chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer011`: + + * `Semantic.NONE`: Flink will not guarantee anything. Produced records can be lost or they can + be duplicated. + * `Semantic.AT_LEAST_ONCE` (default setting): similar to `setFlushOnCheckpoint(true)` in + `FlinkKafkaProducer010`. his guarantees that no records will be lost (although they can be duplicated). + * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once semantic. + +<div class="alert alert-warning"> + <strong>Attention:</strong> Depending on your Kafka configuration, even after Kafka acknowledges + writes you can still experience data losses. In particular keep in mind about following properties + in Kafka config: + <ul> + <li><tt>acks</tt></li> + <li><tt>log.flush.interval.messages</tt></li> + <li><tt>log.flush.interval.ms</tt></li> + <li><tt>log.flush.*</tt></li> + </ul> + Default values for the above options can easily lead to data loss. Please refer to the Kafka documentation + for more explanation. +</div> + + +##### Caveats + +`Semantic.EXACTLY_ONCE` mode relies on the ability to commit transactions +that were started before taking a checkpoint, after recovering from the said checkpoint. If the time +between Flink application crash and completed restart is larger then Kafka's transaction timeout +there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). +Having this in mind, please configure your transaction timeout appropriately to your expected down +times. + +Kafka brokers by default have `transaction.max.timeout.ms` set to 15 minutes. This property will +not allow to set transaction timeouts for the producers larger then it's value. +`FlinkKafkaProducer011` by default sets the `transaction.timeout.ms` property in producer config to +1 hour, thus `transaction.max.timeout.ms` should be increased before using the +`Semantic.EXACTLY_ONCE` mode. + +In `read_committed` mode of `KafkaConsumer`, any transactions that were not finished +(neither aborted nor completed) will block all reads from the given Kafka topic past any +un-finished transaction. In other words after following sequence of events: + +1. User started `transaction1` and written some records using it +2. User started `transaction2` and written some further records using it +3. User committed `transaction2` + +Even if records from `transaction2` are already committed, they will not be visible to +the consumers until `transaction1` is committed or aborted. This hastwo implications: + + * First of all, during normal working of Flink applications, user can expect a delay in visibility + of the records produced into Kafka topics, equal to average time between completed checkpoints. + * Secondly in case of Flink application failure, topics into which this application was writting, + will be blocked for the readers until the application restarts or the configured transaction + timeout time will pass. This remark only applies for the cases when there are multiple + agents/applications writing to the same Kafka topic. + +**Note**: `Semantic.EXACTLY_ONCE` mode uses a fixed size pool of KafkaProducers +per each `FlinkKafkaProducer011` instance. One of each of those producers is used per one +checkpoint. If the number of concurrent checkpoints exceeds the pool size, `FlinkKafkaProducer011` +will throw an exception and will fail the whole application. Please configure max pool size and max +number of concurrent checkpoints accordingly. + +**Note**: `Semantic.EXACTLY_ONCE` takes all possible measures to not leave any lingering transactions +that would block the consumers from reading from Kafka topic more then it is necessary. However in the +event of failure of Flink application before first checkpoint, after restarting such application there +is no information in the system about previous pool sizes. Thus it is unsafe to scale down Flink +application before first checkpoint completes, by factor larger then `FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`. + ## Using Kafka timestamps and Flink event time in Kafka 0.10 Since Apache Kafka 0.10+, Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java index f811893..cf35a59 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java @@ -23,4 +23,13 @@ package org.apache.flink.streaming.connectors.kafka; */ @SuppressWarnings("serial") public class Kafka010ProducerITCase extends KafkaProducerTestBase { + @Override + public void testExactlyOnceRegularSink() throws Exception { + // Kafka010 does not support exactly once semantic + } + + @Override + public void testExactlyOnceCustomOperator() throws Exception { + // Kafka010 does not support exactly once semantic + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java new file mode 100644 index 0000000..8d165c3 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.11.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>Please refer to Kafka's documentation for the available configuration properties: + * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> + */ +public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> { + + private static final long serialVersionUID = 2324564345203409112L; + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.11.x. + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.11.x + * + * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer011(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.11.x + * + * <p>This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.11.x + * + * <p>This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(topics, deserializer, props); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java new file mode 100644 index 0000000..67e237d --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -0,0 +1,1039 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer + * will use {@link Semantic#AT_LEAST_ONCE} semantic. Before using {@link Semantic#EXACTLY_ONCE} please refer to Flink's + * Kafka connector documentation. + */ +public class FlinkKafkaProducer011<IN> + extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> { + + /** + * Semantics that can be chosen. + * <li>{@link #EXACTLY_ONCE}</li> + * <li>{@link #AT_LEAST_ONCE}</li> + * <li>{@link #NONE}</li> + */ + public enum Semantic { + + /** + * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be + * committed to the Kafka on a checkpoint. + * + * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}. Between each + * checkpoint there is created new Kafka transaction, which is being committed on + * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete notifications are + * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s in the pool. In that + * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail + * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer} from previous checkpoint. + * To decrease chances of failing checkpoints there are three options: + * <li>decrease number of max concurrent checkpoints</li> + * <li>make checkpoints more reliable (so that they complete faster)</li> + * <li>increase delay between checkpoints</li> + * <li>increase size of {@link FlinkKafkaProducer}s pool</li> + */ + EXACTLY_ONCE, + + /** + * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + */ + AT_LEAST_ONCE, + + /** + * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case + * of failure. + */ + NONE + } + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); + + private static final long serialVersionUID = 1L; + + /** + * This coefficient determines what is the safe scale down factor. + * + * <p>If the Flink application previously failed before first checkpoint completed or we are starting new batch + * of {@link FlinkKafkaProducer011} from scratch without clean shutdown of the previous one, + * {@link FlinkKafkaProducer011} doesn't know what was the set of previously used Kafka's transactionalId's. In + * that case, it will try to play safe and abort all of the possible transactionalIds from the range of: + * {@code [0, getNumberOfParallelSubtasks() * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) } + * + * <p>The range of available to use transactional ids is: + * {@code [0, getNumberOfParallelSubtasks() * kafkaProducersPoolSize) } + * + * <p>This means that if we decrease {@code getNumberOfParallelSubtasks()} by a factor larger then + * {@code SAFE_SCALE_DOWN_FACTOR} we can have a left some lingering transaction. + */ + public static final int SAFE_SCALE_DOWN_FACTOR = 5; + + /** + * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}. + */ + public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; + + /** + * Default value for kafka transaction timeout. + */ + public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); + + /** + * Configuration key for disabling the metrics reporting. + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Descriptor of the transacionalIds list. + */ + private static final ListStateDescriptor<NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = + new ListStateDescriptor<>("next-transactional-id-hint", TypeInformation.of(NextTransactionalIdHint.class)); + + /** + * State for nextTransactionalIdHint. + */ + private transient ListState<NextTransactionalIdHint> nextTransactionalIdHintState; + + /** + * Hint for picking next transactional id. + */ + private NextTransactionalIdHint nextTransactionalIdHint; + + /** + * User defined properties for the Producer. + */ + private final Properties producerConfig; + + /** + * The name of the default topic this producer is writing data to. + */ + private final String defaultTopicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into. + * byte[] for Kafka. + */ + private final KeyedSerializationSchema<IN> schema; + + /** + * User-provided partitioner for assigning an object to a Kafka partition for each topic. + */ + private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner; + + /** + * Partitions of each topic. + */ + private final Map<String, int[]> topicPartitionsMap; + + /** + * Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception. + */ + private final int kafkaProducersPoolSize; + + /** + * Available transactional ids. + */ + private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>(); + + /** + * Pool of KafkaProducers objects. + */ + private transient ProducersPool producersPool = new ProducersPool(); + + /** + * Flag controlling whether we are writing the Flink record's timestamp into Kafka. + */ + private boolean writeTimestampToKafka = false; + + /** + * Flag indicating whether to accept failures (and log them), or to fail on failures. + */ + private boolean logFailuresOnly; + + /** + * Semantic chosen for this instance. + */ + private Semantic semantic; + + // -------------------------------- Runtime fields ------------------------------------------ + + /** The callback than handles error propagation or logging callbacks. */ + @Nullable + private transient Callback callback; + + /** Errors encountered in the async producer are stored here. */ + @Nullable + private transient volatile Exception asyncException; + + /** Lock for accessing the pending records. */ + private final SerializableObject pendingRecordsLock = new SerializableObject(); + + /** Number of unacknowledged records. */ + private final AtomicLong pendingRecords = new AtomicLong(); + + /** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */ + private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>(); + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this( + topicId, + new KeyedSerializationSchemaWrapper<>(serializationSchema), + getPropertiesFromBrokerList(brokerList), + Optional.of(new FlinkFixedPartitioner<IN>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this( + topicId, + new KeyedSerializationSchemaWrapper<>(serializationSchema), + producerConfig, + Optional.of(new FlinkFixedPartitioner<IN>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this( + topicId, + serializationSchema, + getPropertiesFromBrokerList(brokerList), + Optional.of(new FlinkFixedPartitioner<IN>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this( + topicId, + serializationSchema, + producerConfig, + Optional.of(new FlinkFixedPartitioner<IN>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011( + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Semantic semantic) { + this(topicId, + serializationSchema, + producerConfig, + Optional.of(new FlinkFixedPartitioner<IN>()), + semantic, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + */ + public FlinkKafkaProducer011( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { + this( + defaultTopicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.AT_LEAST_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer011( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + super(TypeInformation.of(new TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})); + + this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null"); + this.schema = checkNotNull(serializationSchema, "serializationSchema is null"); + this.producerConfig = checkNotNull(producerConfig, "producerConfig is null"); + this.flinkKafkaPartitioner = checkNotNull(customPartitioner, "customPartitioner is null").orElse(null); + this.semantic = checkNotNull(semantic, "semantic is null"); + this.kafkaProducersPoolSize = kafkaProducersPoolSize; + checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty"); + + ClosureCleaner.clean(this.flinkKafkaPartitioner, true); + ClosureCleaner.ensureSerializable(serializationSchema); + + // set the producer configuration properties for kafka record key value serializers. + if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + // eagerly ensure that bootstrap servers are set. + if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); + } + + if (!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { + long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds(); + checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer"); + this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout); + LOG.warn("Property [%s] not specified. Setting it to %s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT); + } + + this.topicPartitionsMap = new HashMap<>(); + } + + // ---------------------------------- Properties -------------------------- + + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. + * Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.writeTimestampToKafka = writeTimestampToKafka; + } + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + // ----------------------------------- Utilities -------------------------- + + /** + * Initializes the connection to Kafka. + */ + @Override + public void open(Configuration configuration) throws Exception { + if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE); + semantic = Semantic.NONE; + } + + if (logFailuresOnly) { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); + } + acknowledgeMessage(); + } + }; + } + else { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null && asyncException == null) { + asyncException = exception; + } + acknowledgeMessage(); + } + }; + } + + super.open(configuration); + } + + @Override + public void invoke(KafkaTransactionState transaction, IN next, Context context) throws Exception { + checkErroneous(); + + byte[] serializedKey = schema.serializeKey(next); + byte[] serializedValue = schema.serializeValue(next); + String targetTopic = schema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + + Long timestamp = null; + if (this.writeTimestampToKafka) { + timestamp = context.timestamp(); + } + + ProducerRecord<byte[], byte[]> record; + int[] partitions = topicPartitionsMap.get(targetTopic); + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + if (flinkKafkaPartitioner != null) { + record = new ProducerRecord<>( + targetTopic, + flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), + timestamp, + serializedKey, + serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + } + pendingRecords.incrementAndGet(); + transaction.producer.send(record, callback); + } + + @Override + public void close() throws Exception { + if (currentTransaction != null) { + // to avoid exceptions on aborting transactions with some pending records + flush(currentTransaction); + } + try { + super.close(); + } + catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } + try { + producersPool.close(); + } + catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } + // make sure we propagate pending errors + checkErroneous(); + } + + // ------------------- Logic for handling checkpoint flushing -------------------------- // + + @Override + protected KafkaTransactionState beginTransaction() throws Exception { + switch (semantic) { + case EXACTLY_ONCE: + FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool(); + producer.beginTransaction(); + return new KafkaTransactionState(producer.getTransactionalId(), producer); + case AT_LEAST_ONCE: + case NONE: + // Do not create new producer on each beginTransaction() if it is not necessary + if (currentTransaction != null && currentTransaction.producer != null) { + return new KafkaTransactionState(currentTransaction.producer); + } + return new KafkaTransactionState(initProducer(true)); + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception { + FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll(); + if (producer == null) { + String transactionalId = availableTransactionalIds.poll(); + if (transactionalId == null) { + throw new Exception( + "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins."); + } + producer = initTransactionalProducer(transactionalId, true); + producer.initTransactions(); + } + return producer; + } + + @Override + protected void preCommit(KafkaTransactionState transaction) throws Exception { + switch (semantic) { + case EXACTLY_ONCE: + case AT_LEAST_ONCE: + flush(transaction); + break; + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + checkErroneous(); + } + + @Override + protected void commit(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + transaction.producer.commitTransaction(); + producersPool.add(transaction.producer); + break; + case AT_LEAST_ONCE: + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void recoverAndCommit(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + KafkaTransactionState kafkaTransaction = transaction; + FlinkKafkaProducer<byte[], byte[]> producer = + initTransactionalProducer(kafkaTransaction.transactionalId, false); + producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch); + try { + producer.commitTransaction(); + producer.close(); + } + catch (InvalidTxnStateException ex) { + // That means we have committed this transaction before. + LOG.warn("Encountered error {} while recovering transaction {}. " + + "Presumably this transaction has been already committed before", + ex, + transaction); + } + break; + case AT_LEAST_ONCE: + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void abort(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + transaction.producer.abortTransaction(); + producersPool.add(transaction.producer); + break; + case AT_LEAST_ONCE: + case NONE: + producersPool.add(transaction.producer); + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void recoverAndAbort(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + FlinkKafkaProducer<byte[], byte[]> producer = + initTransactionalProducer(transaction.transactionalId, false); + producer.resumeTransaction(transaction.producerId, transaction.epoch); + producer.abortTransaction(); + producer.close(); + break; + case AT_LEAST_ONCE: + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + private void acknowledgeMessage() { + pendingRecords.decrementAndGet(); + } + + /** + * Flush pending records. + * @param transaction + */ + private void flush(KafkaTransactionState transaction) throws Exception { + if (transaction.producer != null) { + transaction.producer.flush(); + } + long pendingRecordsCount = pendingRecords.get(); + if (pendingRecordsCount != 0) { + throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount); + } + + // if the flushed requests has errors, we should propagate it also and fail the checkpoint + checkErroneous(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + super.snapshotState(context); + + nextTransactionalIdHintState.clear(); + // To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the + // subtasks would write exactly same information. + if (getRuntimeContext().getIndexOfThisSubtask() == 0 && nextTransactionalIdHint != null) { + long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId; + + // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that + // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this + // scaling up. + if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) { + nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize; + } + + nextTransactionalIdHintState.add(new NextTransactionalIdHint( + getRuntimeContext().getNumberOfParallelSubtasks(), + nextFreeTransactionalId)); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( + NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); + + if (semantic != Semantic.EXACTLY_ONCE) { + nextTransactionalIdHint = null; + } else { + ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get()); + if (transactionalIdHints.size() > 1) { + throw new IllegalStateException( + "There should be at most one next transactional id hint written by the first subtask"); + } else if (transactionalIdHints.size() == 0) { + nextTransactionalIdHint = new NextTransactionalIdHint(0, 0); + + // this means that this is either: + // (1) the first execution of this application + // (2) previous execution has failed before first checkpoint completed + // + // in case of (2) we have to abort all previous transactions, but we don't know was the parallelism used + // then, so we must guess using current configured pool size, current parallelism and + // SAFE_SCALE_DOWN_FACTOR + long abortTransactionalIdStart = getRuntimeContext().getIndexOfThisSubtask(); + long abortTransactionalIdEnd = abortTransactionalIdStart + 1; + + abortTransactionalIdStart *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR; + abortTransactionalIdEnd *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR; + abortTransactions(LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd)); + } else { + nextTransactionalIdHint = transactionalIdHints.get(0); + } + } + + super.initializeState(context); + } + + @Override + protected Optional<KafkaTransactionContext> initializeUserContext() { + if (semantic != Semantic.EXACTLY_ONCE) { + return Optional.empty(); + } + + Set<String> transactionalIds = generateNewTransactionalIds(); + resetAvailableTransactionalIdsPool(transactionalIds); + return Optional.of(new KafkaTransactionContext(transactionalIds)); + } + + private Set<String> generateNewTransactionalIds() { + Preconditions.checkState(nextTransactionalIdHint != null, + "nextTransactionalIdHint must be present for EXACTLY_ONCE"); + + // range of available transactional ids is: + // [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize) + // loop below picks in a deterministic way a subrange of those available transactional ids based on index of + // this subtask + int subtaskId = getRuntimeContext().getIndexOfThisSubtask(); + Set<String> transactionalIds = new HashSet<>(); + for (int i = 0; i < kafkaProducersPoolSize; i++) { + long transactionalId = nextTransactionalIdHint.nextFreeTransactionalId + subtaskId * kafkaProducersPoolSize + i; + transactionalIds.add(generateTransactionalId(transactionalId)); + } + LOG.info("Generated new transactionalIds {}", transactionalIds); + return transactionalIds; + } + + @Override + protected void finishRecoveringContext() { + cleanUpUserContext(); + resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds); + LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds); + } + + /** + * After initialization make sure that all previous transactions from the current user context have been completed. + */ + private void cleanUpUserContext() { + if (!getUserContext().isPresent()) { + return; + } + abortTransactions(getUserContext().get().transactionalIds.stream()); + } + + private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) { + availableTransactionalIds.clear(); + for (String transactionalId : transactionalIds) { + availableTransactionalIds.add(transactionalId); + } + } + + // ----------------------------------- Utilities -------------------------- + + private void abortTransactions(LongStream transactionalIds) { + abortTransactions(transactionalIds.mapToObj(this::generateTransactionalId)); + } + + private void abortTransactions(Stream<String> transactionalIds) { + transactionalIds.forEach(transactionalId -> { + try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer = + initTransactionalProducer(transactionalId, false)) { + kafkaProducer.initTransactions(); + } + }); + } + + private String generateTransactionalId(long transactionalId) { + String transactionalIdFormat = getRuntimeContext().getTaskName() + "-%d"; + return String.format(transactionalIdFormat, transactionalId); + } + + int getTransactionCoordinatorId() { + if (currentTransaction == null || currentTransaction.producer == null) { + throw new IllegalArgumentException(); + } + return currentTransaction.producer.getTransactionCoordinatorId(); + } + + private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) { + producerConfig.put("transactional.id", transactionalId); + return initProducer(registerMetrics); + } + + private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) { + FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig); + + RuntimeContext ctx = getRuntimeContext(); + + if (flinkKafkaPartitioner != null) { + if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) { + ((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions( + getPartitionsByTopic(this.defaultTopicId, producer)); + } + flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}", + ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId); + + // register Kafka metrics to Flink accumulators + if (registerMetrics && !Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + Map<MetricName, ? extends Metric> metrics = producer.metrics(); + + if (metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Producer implementation does not support metrics"); + } else { + final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer"); + for (Map.Entry<MetricName, ? extends Metric> entry: metrics.entrySet()) { + String name = entry.getKey().name(); + Metric metric = entry.getValue(); + + KafkaMetricMuttableWrapper wrapper = previouslyCreatedMetrics.get(name); + if (wrapper != null) { + wrapper.setKafkaMetric(metric); + } else { + // TODO: somehow merge metrics from all active producers? + wrapper = new KafkaMetricMuttableWrapper(metric); + previouslyCreatedMetrics.put(name, wrapper); + kafkaMetricGroup.gauge(name, wrapper); + } + } + } + } + return producer; + } + + private void checkErroneous() throws Exception { + Exception e = asyncException; + if (e != null) { + // prevent double throwing + asyncException = null; + throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); + } + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + producersPool = new ProducersPool(); + } + + private static Properties getPropertiesFromBrokerList(String brokerList) { + String[] elements = brokerList.split(","); + + // validate the broker addresses + for (String broker: elements) { + NetUtils.getCorrectHostnamePort(broker); + } + + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + return props; + } + + private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator<PartitionInfo>() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + int[] partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + + return partitions; + } + + /** + * State for handling transactions. + */ + public static class KafkaTransactionState { + + private final transient FlinkKafkaProducer<byte[], byte[]> producer; + + @Nullable + public final String transactionalId; + + public final long producerId; + + public final short epoch; + + public KafkaTransactionState(String transactionalId, FlinkKafkaProducer<byte[], byte[]> producer) { + this.producer = producer; + this.transactionalId = transactionalId; + this.producerId = producer.getProducerId(); + this.epoch = producer.getEpoch(); + } + + public KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> producer) { + this.producer = producer; + this.transactionalId = null; + this.producerId = -1; + this.epoch = -1; + } + + @Override + public String toString() { + return String.format("%s [transactionalId=%s]", this.getClass().getSimpleName(), transactionalId); + } + } + + /** + * Context associated to this instance of the {@link FlinkKafkaProducer011}. User for keeping track of the + * transactionalIds. + */ + public static class KafkaTransactionContext { + public final Set<String> transactionalIds; + + public KafkaTransactionContext(Set<String> transactionalIds) { + this.transactionalIds = transactionalIds; + } + } + + static class ProducersPool implements Closeable { + private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], byte[]>> pool = new LinkedBlockingDeque<>(); + + public void add(FlinkKafkaProducer<byte[], byte[]> producer) { + pool.add(producer); + } + + public FlinkKafkaProducer<byte[], byte[]> poll() { + return pool.poll(); + } + + @Override + public void close() { + while (!pool.isEmpty()) { + pool.poll().close(); + } + } + } + + /** + * Keep information required to deduce next safe to use transactional id. + */ + public static class NextTransactionalIdHint { + public int lastParallelism = 0; + public long nextFreeTransactionalId = 0; + + public NextTransactionalIdHint() { + this(0, 0); + } + + public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) { + this.lastParallelism = parallelism; + this.nextFreeTransactionalId = nextFreeTransactionalId; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java new file mode 100644 index 0000000..edc37cb --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.11. + */ +public class Kafka011AvroTableSource extends KafkaAvroTableSource { + + /** + * Creates a Kafka 0.11 Avro {@link StreamTableSource} using a given {@link SpecificRecord}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + public Kafka011AvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + record); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java new file mode 100644 index 0000000..471c2d2 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.11. + */ +public class Kafka011JsonTableSource extends KafkaJsonTableSource { + + /** + * Creates a Kafka 0.11 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. + */ + public Kafka011JsonTableSource( + String topic, + Properties properties, + TypeInformation<Row> typeInfo) { + + super(topic, properties, typeInfo); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java new file mode 100644 index 0000000..5eaea97 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.11. + */ +public class Kafka011TableSource extends Kafka09TableSource { + + /** + * Creates a Kafka 0.11 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. + */ + public Kafka011TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + TypeInformation<Row> typeInfo) { + + super(topic, properties, deserializationSchema, typeInfo); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java new file mode 100644 index 0000000..a22ff5c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal.metrics; + +import org.apache.flink.metrics.Gauge; + +import org.apache.kafka.common.Metric; + +/** + * Gauge for getting the current value of a Kafka metric. + */ +public class KafkaMetricMuttableWrapper implements Gauge<Double> { + private org.apache.kafka.common.Metric kafkaMetric; + + public KafkaMetricMuttableWrapper(org.apache.kafka.common.Metric metric) { + this.kafkaMetric = metric; + } + + @Override + public Double getValue() { + return kafkaMetric.value(); + } + + public void setKafkaMetric(Metric kafkaMetric) { + this.kafkaMetric = kafkaMetric; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java new file mode 100644 index 0000000..51410da --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import kafka.server.KafkaServer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * IT cases for the {@link FlinkKafkaProducer011}. + */ +@SuppressWarnings("serial") +public class FlinkKafkaProducer011Tests extends KafkaTestBase { + protected String transactionalId; + protected Properties extraProperties; + + protected TypeInformationSerializationSchema<Integer> integerSerializationSchema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema = + new KeyedSerializationSchemaWrapper(integerSerializationSchema); + + @Before + public void before() { + transactionalId = UUID.randomUUID().toString(); + extraProperties = new Properties(); + extraProperties.putAll(standardProps); + extraProperties.put("transactional.id", transactionalId); + extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("isolation.level", "read_committed"); + } + + @Test(timeout = 30000L) + public void testHappyPath() throws IOException { + String topicName = "flink-kafka-producer-happy-path"; + try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.commitTransaction(); + } + assertRecord(topicName, "42", "42"); + deleteTestTopic(topicName); + } + + @Test(timeout = 30000L) + public void testResumeTransaction() throws IOException { + String topicName = "flink-kafka-producer-resume-transaction"; + try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.flush(); + long producerId = kafkaProducer.getProducerId(); + short epoch = kafkaProducer.getEpoch(); + + try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + + assertRecord(topicName, "42", "42"); + + // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction + kafkaProducer.commitTransaction(); + + // this shouldn't fail also, for same reason as above + try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + } + deleteTestTopic(topicName); + } + + @Test(timeout = 120_000L) + public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); + testHarness.initializeState(null); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + OperatorStateHandles snapshot = testHarness.snapshot(1, 3); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + failBroker(leaderId); + + try { + testHarness.processElement(44, 4); + testHarness.snapshot(2, 5); + assertFalse(true); + } + catch (Exception ex) { + // expected + } + try { + testHarness.close(); + } + catch (Exception ex) { + } + + kafkaServer.restartBroker(leaderId); + + testHarness = createTestHarness(topic); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.close(); + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + + deleteTestTopic(topic); + } + + @Test(timeout = 120_000L) + public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception { + String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify"; + + Properties properties = createProperties(); + + FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( + topic, + integerKeyedSerializationSchema, + properties, + FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + IntSerializer.INSTANCE); + + testHarness1.setup(); + testHarness1.open(); + testHarness1.initializeState(null); + testHarness1.processElement(42, 0); + testHarness1.snapshot(0, 1); + testHarness1.processElement(43, 2); + int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId(); + OperatorStateHandles snapshot = testHarness1.snapshot(1, 3); + + failBroker(transactionCoordinatorId); + + try { + testHarness1.processElement(44, 4); + testHarness1.notifyOfCompletedCheckpoint(1); + testHarness1.close(); + } + catch (Exception ex) { + // Expected... some random exception could be thrown by any of the above operations. + } + finally { + kafkaServer.restartBroker(transactionCoordinatorId); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) { + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + } + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + + deleteTestTopic(topic); + } + + /** + * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure. + * If such transactions were left alone lingering it consumers would be unable to read committed records + * that were created after this lingering transaction. + */ + @Test(timeout = 120_000L) + public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3); + + testHarness.processElement(44, 4); + testHarness.snapshot(2, 5); + testHarness.processElement(45, 6); + + // do not close previous testHarness to make sure that closing do not clean up something (in case of failure + // there might not be any close) + testHarness = createTestHarness(topic); + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(46, 7); + testHarness.snapshot(4, 8); + testHarness.processElement(47, 9); + testHarness.notifyOfCompletedCheckpoint(4); + + //now we should have: + // - records 42 and 43 in committed transactions + // - aborted transactions with records 44 and 45 + // - committed transaction with record 46 + // - pending transaction with record 47 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L); + + testHarness.close(); + deleteTestTopic(topic); + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception { + Properties properties = createProperties(); + + FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( + topic, + integerKeyedSerializationSchema, + properties, + FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + + return new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + IntSerializer.INSTANCE); + } + + private Properties createProperties() { + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true"); + return properties; + } + + @Test + public void testRecoverCommittedTransaction() throws Exception { + String topic = "flink-kafka-producer-recover-committed-transaction"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); // producerA - start transaction (txn) 0 + testHarness.processElement(42, 0); // producerA - write 42 in txn 0 + OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1 + testHarness.processElement(43, 2); // producerB - write 43 in txn 1 + testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool + testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2 + testHarness.processElement(44, 4); // producerA - write 44 in txn 2 + testHarness.close(); // producerA - abort txn 2 + + testHarness = createTestHarness(topic); + testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0 + testHarness.close(); + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L); + + deleteTestTopic(topic); + } + + @Test + public void testRunOutOfProducersInThePool() throws Exception { + String topic = "flink-kafka-run-out-of-producers"; + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) { + testHarness.processElement(i, i * 2); + testHarness.snapshot(i, i * 2 + 1); + } + } + catch (Exception ex) { + if (!ex.getCause().getMessage().startsWith("Too many ongoing")) { + throw ex; + } + } + deleteTestTopic(topic); + } + + // shut down a Kafka broker + private void failBroker(int brokerId) { + KafkaServer toShutDown = null; + for (KafkaServer server : kafkaServer.getBrokers()) { + + if (kafkaServer.getBrokerId(server) == brokerId) { + toShutDown = server; + break; + } + } + + if (toShutDown == null) { + StringBuilder listOfBrokers = new StringBuilder(); + for (KafkaServer server : kafkaServer.getBrokers()) { + listOfBrokers.append(kafkaServer.getBrokerId(server)); + listOfBrokers.append(" ; "); + } + + throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId + + " ; available brokers: " + listOfBrokers.toString()); + } else { + toShutDown.shutdown(); + toShutDown.awaitShutdown(); + } + } + + private void assertRecord(String topicName, String expectedKey, String expectedValue) { + try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { + kafkaConsumer.subscribe(Collections.singletonList(topicName)); + ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); + + ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); + assertEquals(expectedKey, record.key()); + assertEquals(expectedValue, record.value()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java new file mode 100644 index 0000000..e60bf17 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Tests for the {@link Kafka011AvroTableSource}. + */ +public class Kafka011AvroTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + + return new Kafka011AvroTableSource( + topic, + properties, + AvroSpecificRecord.class); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) AvroRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer011.class; + } +} +
