[
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138014#comment-16138014
]
ASF GitHub Bot commented on FLINK-6988:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4239#discussion_r134676224
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * <p>Implementation note: This producer is a hybrid between a regular
regular
+ * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a)
and a custom operator (b).
+ *
+ * <p>Details about approach (a):
+ * Because of regular {@link
org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations,
this
+ * variant do not allow accessing the timestamp attached to the record.
+ *
+ * <p>Details about approach (b):
+ * Kafka 0.11 supports writing the timestamp attached to a record to
Kafka. When using the
+ * {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the
Kafka producer can access the internal
+ * record timestamp of the record and write it to Kafka.
+ *
+ * <p>All methods and constructors in this class are marked with the
approach they are needed for.
+ */
+public class FlinkKafkaProducer011<IN>
+ extends TwoPhaseCommitSinkFunction<IN,
FlinkKafkaProducer011.KafkaTransactionState> {
+
+ /**
+ * Semantics that can be chosen.
+ * <li>{@link #EXACTLY_ONCE}</li>
+ * <li>{@link #AT_LEAST_ONCE}</li>
+ * <li>{@link #NONE}</li>
+ */
+ public enum Semantic {
+ /**
+ * Semantic.EXACTLY_ONCE the Flink producer will write all
messages in a Kafka transaction that will be
+ * committed to the Kafka on a checkpoint.
+ *
+ * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool
of {@link FlinkKafkaProducer}. Between each
+ * checkpoint there is created new Kafka transaction, which is
being committed on
+ * {@link
FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete
notifications are
+ * running late, {@link FlinkKafkaProducer011} can run out of
{@link FlinkKafkaProducer}s in the pool. In that
+ * case any subsequent {@link
FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
+ * and {@link FlinkKafkaProducer011} will keep using the {@link
FlinkKafkaProducer} from previous checkpoint.
+ * To decrease chances of failing checkpoints there are three
options:
+ * <li>decrease number of max concurrent checkpoints</li>
+ * <li>make checkpoints more reliable (so that they complete
faster)</li>
+ * <li>increase delay between checkpoints</li>
+ * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
+ */
+ EXACTLY_ONCE,
+ /**
+ * Semantic.AT_LEAST_ONCE the Flink producer will wait for all
outstanding messages in the Kafka buffers
+ * to be acknowledged by the Kafka producer on a checkpoint.
+ */
+ AT_LEAST_ONCE,
+ /**
+ * Semantic.NONE means that nothing will be guaranteed.
Messages can be lost and/or duplicated in case
+ * of failure.
+ */
+ NONE
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Default number of KafkaProducers in the pool. See {@link
Semantic#EXACTLY_ONCE}.
+ */
+ public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
+
+ /**
+ * Configuration key for disabling the metrics reporting.
+ */
+ public static final String KEY_DISABLE_METRICS =
"flink.disable-metrics";
+
+ /**
+ * Descriptor of the transacionalIds list.
+ */
+ private static final ListStateDescriptor<String>
TRANSACTIONAL_IDS_DESCRIPTOR =
+ new ListStateDescriptor<>("transactional-ids",
TypeInformation.of(String.class));
+
+ /**
+ * Pool of transacional ids backed up in state.
+ */
+ private ListState<String> transactionalIdsState;
+
+ /**
+ * Already used transactional ids.
+ */
+ private final Set<String> usedTransactionalIds = new HashSet<>();
+
+ /**
+ * Available to use transactional ids.
+ */
+ private final BlockingDeque<String> availableTransactionalIds = new
LinkedBlockingDeque<>();
+
+ /**
+ * User defined properties for the Producer.
+ */
+ private final Properties producerConfig;
+
+ /**
+ * The name of the default topic this producer is writing data to.
+ */
+ private final String defaultTopicId;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with
Flink into.
+ * byte[] for Kafka.
+ */
+ private final KeyedSerializationSchema<IN> schema;
+
+ /**
+ * User-provided partitioner for assigning an object to a Kafka
partition for each topic.
+ */
+ private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
+
+ /**
+ * Partitions of each topic.
+ */
+ private final Map<String, int[]> topicPartitionsMap;
+
+ /**
+ * Max number of producers in the pool. If all producers are in use,
snapshoting state will throw an exception.
+ */
+ private final int kafkaProducersPoolSize;
+
+ /**
+ * Flag controlling whether we are writing the Flink record's timestamp
into Kafka.
+ */
+ private boolean writeTimestampToKafka = false;
+
+ /**
+ * Flag indicating whether to accept failures (and log them), or to
fail on failures.
+ */
+ private boolean logFailuresOnly;
+
+ /**
+ * Semantic chosen for this instance.
+ */
+ private Semantic semantic;
+
+ /**
+ * Pool of KafkaProducers objects.
+ */
+ private transient ProducersPool producersPool = new ProducersPool();
+
+ // -------------------------------- Runtime fields
------------------------------------------
+
+ /** The callback than handles error propagation or logging callbacks. */
+ @Nullable
+ private transient Callback callback;
+
+ /** Errors encountered in the async producer are stored here. */
+ @Nullable
+ private transient volatile Exception asyncException;
+
+ /** Lock for accessing the pending records. */
+ private final SerializableObject pendingRecordsLock = new
SerializableObject();
+
+ /** Number of unacknowledged records. */
+ private final AtomicLong pendingRecords = new AtomicLong();
+
+ /** Cache of metrics to replace already registered metrics instead of
overwriting existing ones. */
+ private final Map<String, KafkaMetricMuttableWrapper>
previouslyCreatedMetrics = new HashMap<>();
+
+ // ---------------------- "Constructors" for timestamp writing
------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema
supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
KeyedSerializationSchema<IN> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId,
serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization
schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
SerializationSchema<IN> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new
FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a
+ * kafka-consumable byte[] supporting
key/value messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only
+ * required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
KeyedSerializationSchema<IN> serializationSchema,
+
Properties producerConfig,
+
FlinkKafkaPartitioner<IN> customPartitioner) {
+ return writeToKafkaWithTimestamps(
+ inStream,
+ topicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ Semantic.EXACTLY_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a
+ * kafka-consumable byte[] supporting
key/value messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only
+ * required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ * @param semantic Defines semantic that will be used by this producer
(see {@link Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool
size (see {@link Semantic#EXACTLY_ONCE}).
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
KeyedSerializationSchema<IN> serializationSchema,
+
Properties producerConfig,
+
FlinkKafkaPartitioner<IN> customPartitioner,
+
Semantic semantic,
+
int kafkaProducersPoolSize) {
+
+ GenericTypeInfo<Object> objectTypeInfo = new
GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer011<IN> kafkaProducer =
+ new FlinkKafkaProducer011<>(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ semantic,
+ kafkaProducersPoolSize);
+ KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer);
+ SingleOutputStreamOperator<Object> transformation =
inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink);
+ return new FlinkKafkaProducer011Configuration<>(transformation,
streamSink);
+ }
+
+ // ---------------------- Regular constructors w/o timestamp support
------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined (keyless) serialization schema.
+ */
+ public FlinkKafkaProducer011(String brokerList, String topicId,
SerializationSchema<IN> serializationSchema) {
+ this(topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema),
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined (keyless) serialization schema.
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer011(String topicId, SerializationSchema<IN>
serializationSchema, Properties producerConfig) {
+ this(topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new
FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization
schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ */
+ public FlinkKafkaProducer011(String topicId, SerializationSchema<IN>
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN>
customPartitioner) {
+ this(topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig,
customPartitioner);
+ }
+
+ // ------------------- Key/Value serialization schema constructors
----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting
key/value messages
+ */
+ public FlinkKafkaProducer011(String brokerList, String topicId,
KeyedSerializationSchema<IN> serializationSchema) {
+ this(topicId, serializationSchema,
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting
key/value messages
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer011(String topicId,
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, new
FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * <p>This constructor does not allow writing timestamps to Kafka, it
follow approach (a) (see above)
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+ */
+ public FlinkKafkaProducer011(String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner) {
+ this(
+ defaultTopicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ Semantic.EXACTLY_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * <p>This constructor does not allow writing timestamps to Kafka, it
follow approach (a) (see above)
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+ * @param semantic Defines semantic that will be used by this producer
(see {@link Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool
size (see {@link Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer011(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaPartitioner<IN> customPartitioner,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(
+ TypeInformation.of(KafkaTransactionState.class),
+ TypeInformation.of(new
TypeHint<List<KafkaTransactionState>>() {}));
+
+ requireNonNull(defaultTopicId, "TopicID not set");
+ requireNonNull(serializationSchema, "serializationSchema not
set");
+ requireNonNull(producerConfig, "producerConfig not set");
+ ClosureCleaner.clean(customPartitioner, true);
--- End diff --
This has been carried on from Kafka 0.10
> Add Apache Kafka 0.11 connector
> -------------------------------
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions.
> Thanks to that, Flink might be able to implement Kafka sink supporting
> "exactly-once" semantic. API changes and whole transactions support is
> described in
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New
> FlinkKafkaProducer011 would
> * upon creation begin transaction, store transaction identifiers into the
> state and would write all incoming data to an output Kafka topic using that
> transaction
> * on `snapshotState` call, it would flush the data and write in state
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we
> either abort this pending transaction (if not every participant successfully
> saved the snapshot) or restore and commit it.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)