[
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138874#comment-16138874
]
ASF GitHub Bot commented on FLINK-6988:
---------------------------------------
Github user rangadi commented on a diff in the pull request:
https://github.com/apache/flink/pull/4239#discussion_r134838729
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * <p>Implementation note: This producer is a hybrid between a regular
regular
+ * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a)
and a custom operator (b).
+ *
+ * <p>Details about approach (a):
+ * Because of regular {@link
org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations,
this
+ * variant do not allow accessing the timestamp attached to the record.
+ *
+ * <p>Details about approach (b):
+ * Kafka 0.11 supports writing the timestamp attached to a record to
Kafka. When using the
+ * {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the
Kafka producer can access the internal
+ * record timestamp of the record and write it to Kafka.
+ *
+ * <p>All methods and constructors in this class are marked with the
approach they are needed for.
+ */
+public class FlinkKafkaProducer011<IN>
+ extends TwoPhaseCommitSinkFunction<IN,
FlinkKafkaProducer011.KafkaTransactionState> {
+
+ /**
+ * Semantics that can be chosen.
+ * <li>{@link #EXACTLY_ONCE}</li>
+ * <li>{@link #AT_LEAST_ONCE}</li>
+ * <li>{@link #NONE}</li>
+ */
+ public enum Semantic {
+ /**
+ * Semantic.EXACTLY_ONCE the Flink producer will write all
messages in a Kafka transaction that will be
+ * committed to the Kafka on a checkpoint.
+ *
+ * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool
of {@link FlinkKafkaProducer}. Between each
+ * checkpoint there is created new Kafka transaction, which is
being committed on
+ * {@link
FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete
notifications are
+ * running late, {@link FlinkKafkaProducer011} can run out of
{@link FlinkKafkaProducer}s in the pool. In that
+ * case any subsequent {@link
FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
+ * and {@link FlinkKafkaProducer011} will keep using the {@link
FlinkKafkaProducer} from previous checkpoint.
+ * To decrease chances of failing checkpoints there are three
options:
+ * <li>decrease number of max concurrent checkpoints</li>
+ * <li>make checkpoints more reliable (so that they complete
faster)</li>
+ * <li>increase delay between checkpoints</li>
+ * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
+ */
+ EXACTLY_ONCE,
+ /**
+ * Semantic.AT_LEAST_ONCE the Flink producer will wait for all
outstanding messages in the Kafka buffers
+ * to be acknowledged by the Kafka producer on a checkpoint.
+ */
+ AT_LEAST_ONCE,
+ /**
+ * Semantic.NONE means that nothing will be guaranteed.
Messages can be lost and/or duplicated in case
+ * of failure.
+ */
+ NONE
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Default number of KafkaProducers in the pool. See {@link
Semantic#EXACTLY_ONCE}.
+ */
+ public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
+
+ /**
+ * Configuration key for disabling the metrics reporting.
+ */
+ public static final String KEY_DISABLE_METRICS =
"flink.disable-metrics";
+
+ /**
+ * Descriptor of the transacionalIds list.
+ */
+ private static final ListStateDescriptor<String>
TRANSACTIONAL_IDS_DESCRIPTOR =
+ new ListStateDescriptor<>("transactional-ids",
TypeInformation.of(String.class));
+
+ /**
+ * Pool of transacional ids backed up in state.
+ */
+ private ListState<String> transactionalIdsState;
+
+ /**
+ * Already used transactional ids.
+ */
+ private final Set<String> usedTransactionalIds = new HashSet<>();
+
+ /**
+ * Available to use transactional ids.
+ */
+ private final BlockingDeque<String> availableTransactionalIds = new
LinkedBlockingDeque<>();
+
+ /**
+ * User defined properties for the Producer.
+ */
+ private final Properties producerConfig;
+
+ /**
+ * The name of the default topic this producer is writing data to.
+ */
+ private final String defaultTopicId;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with
Flink into.
+ * byte[] for Kafka.
+ */
+ private final KeyedSerializationSchema<IN> schema;
+
+ /**
+ * User-provided partitioner for assigning an object to a Kafka
partition for each topic.
+ */
+ private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
+
+ /**
+ * Partitions of each topic.
+ */
+ private final Map<String, int[]> topicPartitionsMap;
+
+ /**
+ * Max number of producers in the pool. If all producers are in use,
snapshoting state will throw an exception.
+ */
+ private final int kafkaProducersPoolSize;
+
+ /**
+ * Flag controlling whether we are writing the Flink record's timestamp
into Kafka.
+ */
+ private boolean writeTimestampToKafka = false;
+
+ /**
+ * Flag indicating whether to accept failures (and log them), or to
fail on failures.
+ */
+ private boolean logFailuresOnly;
+
+ /**
+ * Semantic chosen for this instance.
+ */
+ private Semantic semantic;
+
+ /**
+ * Pool of KafkaProducers objects.
+ */
+ private transient ProducersPool producersPool = new ProducersPool();
+
+ // -------------------------------- Runtime fields
------------------------------------------
+
+ /** The callback than handles error propagation or logging callbacks. */
+ @Nullable
+ private transient Callback callback;
+
+ /** Errors encountered in the async producer are stored here. */
+ @Nullable
+ private transient volatile Exception asyncException;
+
+ /** Lock for accessing the pending records. */
+ private final SerializableObject pendingRecordsLock = new
SerializableObject();
+
+ /** Number of unacknowledged records. */
+ private final AtomicLong pendingRecords = new AtomicLong();
+
+ /** Cache of metrics to replace already registered metrics instead of
overwriting existing ones. */
+ private final Map<String, KafkaMetricMuttableWrapper>
previouslyCreatedMetrics = new HashMap<>();
+
+ // ---------------------- "Constructors" for timestamp writing
------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema
supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
KeyedSerializationSchema<IN> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId,
serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization
schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
SerializationSchema<IN> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new
FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a
+ * kafka-consumable byte[] supporting
key/value messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only
+ * required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
KeyedSerializationSchema<IN> serializationSchema,
+
Properties producerConfig,
+
FlinkKafkaPartitioner<IN> customPartitioner) {
+ return writeToKafkaWithTimestamps(
+ inStream,
+ topicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ Semantic.EXACTLY_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a
+ * kafka-consumable byte[] supporting
key/value messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only
+ * required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ * @param semantic Defines semantic that will be used by this producer
(see {@link Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool
size (see {@link Semantic#EXACTLY_ONCE}).
+ */
+ public static <IN> FlinkKafkaProducer011Configuration<IN>
writeToKafkaWithTimestamps(DataStream<IN> inStream,
+
String topicId,
+
KeyedSerializationSchema<IN> serializationSchema,
+
Properties producerConfig,
+
FlinkKafkaPartitioner<IN> customPartitioner,
+
Semantic semantic,
+
int kafkaProducersPoolSize) {
+
+ GenericTypeInfo<Object> objectTypeInfo = new
GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer011<IN> kafkaProducer =
+ new FlinkKafkaProducer011<>(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ semantic,
+ kafkaProducersPoolSize);
+ KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer);
+ SingleOutputStreamOperator<Object> transformation =
inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink);
+ return new FlinkKafkaProducer011Configuration<>(transformation,
streamSink);
+ }
+
+ // ---------------------- Regular constructors w/o timestamp support
------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined (keyless) serialization schema.
+ */
+ public FlinkKafkaProducer011(String brokerList, String topicId,
SerializationSchema<IN> serializationSchema) {
+ this(topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema),
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined (keyless) serialization schema.
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer011(String topicId, SerializationSchema<IN>
serializationSchema, Properties producerConfig) {
+ this(topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new
FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization
schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ */
+ public FlinkKafkaProducer011(String topicId, SerializationSchema<IN>
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN>
customPartitioner) {
+ this(topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig,
customPartitioner);
+ }
+
+ // ------------------- Key/Value serialization schema constructors
----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting
key/value messages
+ */
+ public FlinkKafkaProducer011(String brokerList, String topicId,
KeyedSerializationSchema<IN> serializationSchema) {
+ this(topicId, serializationSchema,
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting
key/value messages
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer011(String topicId,
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, new
FlinkFixedPartitioner<IN>());
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * <p>This constructor does not allow writing timestamps to Kafka, it
follow approach (a) (see above)
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+ */
+ public FlinkKafkaProducer011(String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner) {
+ this(
+ defaultTopicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ Semantic.EXACTLY_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * <p>This constructor does not allow writing timestamps to Kafka, it
follow approach (a) (see above)
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+ * @param semantic Defines semantic that will be used by this producer
(see {@link Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool
size (see {@link Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer011(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaPartitioner<IN> customPartitioner,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(
+ TypeInformation.of(KafkaTransactionState.class),
+ TypeInformation.of(new
TypeHint<List<KafkaTransactionState>>() {}));
+
+ requireNonNull(defaultTopicId, "TopicID not set");
+ requireNonNull(serializationSchema, "serializationSchema not
set");
+ requireNonNull(producerConfig, "producerConfig not set");
+ ClosureCleaner.clean(customPartitioner, true);
+ ClosureCleaner.ensureSerializable(serializationSchema);
+
+ this.defaultTopicId = defaultTopicId;
+ this.schema = serializationSchema;
+ this.producerConfig = producerConfig;
+ this.flinkKafkaPartitioner = customPartitioner;
+ this.semantic = semantic;
+ this.kafkaProducersPoolSize = kafkaProducersPoolSize;
+
+ // set the producer configuration properties for kafka record
key value serializers.
+ if
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+ // eagerly ensure that bootstrap servers are set.
+ if
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ throw new
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be
supplied in the producer config properties.");
+ }
+
+ this.topicPartitionsMap = new HashMap<>();
+ }
+
+ // ---------------------------------- Properties
--------------------------
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set
to false,
+ * exceptions will be eventually thrown and cause the streaming program
to
+ * fail (and enter recovery).
+ *
+ * <p>Method is only accessible for approach (a) (see above)
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on
exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ // ----------------------------------- Utilities
--------------------------
+
+ /**
+ * Initializes the connection to Kafka.
+ *
+ * <p>This method is used for approach (a) (see above).
+ */
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ if (semantic != Semantic.NONE && !((StreamingRuntimeContext)
this.getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.warn(String.format("Using [%s] semantic, but
checkpointing is not enabled. Switching to [%s] semantic.", semantic,
Semantic.NONE));
+ semantic = Semantic.NONE;
+ }
+
+ if (logFailuresOnly) {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata
metadata, Exception e) {
+ if (e != null) {
+ LOG.error("Error while sending
record to Kafka: " + e.getMessage(), e);
+ }
+ acknowledgeMessage();
+ }
+ };
+ }
+ else {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata
metadata, Exception exception) {
+ if (exception != null && asyncException
== null) {
+ asyncException = exception;
+ }
+ acknowledgeMessage();
+ }
+ };
+ }
+
+ super.open(configuration);
+ }
+
+ @Override
+ public void invoke(KafkaTransactionState transaction, IN next) throws
Exception {
+ invokeInternal(transaction, next, Long.MAX_VALUE);
+ }
+
+ private void invokeInternal(KafkaTransactionState transaction, IN next,
long elementTimestamp) throws Exception {
+ checkErroneous();
+
+ byte[] serializedKey = schema.serializeKey(next);
+ byte[] serializedValue = schema.serializeValue(next);
+ String targetTopic = schema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = defaultTopicId;
+ }
+
+ Long timestamp = null;
+ if (this.writeTimestampToKafka) {
+ timestamp = elementTimestamp;
+ }
+
+ ProducerRecord<byte[], byte[]> record;
+ int[] partitions = topicPartitionsMap.get(targetTopic);
+ if (null == partitions) {
+ partitions = getPartitionsByTopic(targetTopic,
transaction.producer);
+ topicPartitionsMap.put(targetTopic, partitions);
+ }
+ if (flinkKafkaPartitioner == null) {
+ record = new ProducerRecord<>(targetTopic, null,
timestamp, serializedKey, serializedValue);
+ } else {
+ record = new ProducerRecord<>(targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue,
targetTopic, partitions), timestamp, serializedKey, serializedValue);
+ }
+ pendingRecords.incrementAndGet();
+ transaction.producer.send(record, callback);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions with
some pending records
+ flush(currentTransaction);
+ }
+ try {
+ super.close();
+ }
+ catch (Exception e) {
+ asyncException = ExceptionUtils.firstOrSuppressed(e,
asyncException);
+ }
+ try {
+ producersPool.close();
+ }
+ catch (Exception e) {
+ asyncException = ExceptionUtils.firstOrSuppressed(e,
asyncException);
+ }
+ // make sure we propagate pending errors
+ checkErroneous();
+ }
+
+ // ------------------- Logic for handling checkpoint flushing
-------------------------- //
+
+ @Override
+ protected KafkaTransactionState beginTransaction() throws Exception {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ FlinkKafkaProducer<byte[], byte[]> producer =
producersPool.poll();
+ if (producer == null) {
+ String transactionalId =
availableTransactionalIds.poll();
+ if (transactionalId == null) {
+ throw new Exception(
+ "Too many ongoing
snapshots. Increase kafka producers pool size or decrease number of concurrent
checktpoins.");
+ }
+
usedTransactionalIds.add(transactionalId);
+ producer =
initTransactionalProducer(transactionalId, true);
+ producer.initTransactions();
+ }
+ producer.beginTransaction();
+ return new
KafkaTransactionState(producer.getTransactionalId(), producer);
+ case AT_LEAST_ONCE:
+ case NONE:
+ // Do not create new producer on each
beginTransaction() if it is not necessary
+ if (currentTransaction != null &&
currentTransaction.producer != null) {
+ return new
KafkaTransactionState(currentTransaction.producer);
+ }
+ return new
KafkaTransactionState(initProducer(true));
+ default:
+ throw new UnsupportedOperationException("Not
implemented semantic");
+ }
+ }
+
+ @Override
+ protected void preCommit(KafkaTransactionState transaction) throws
Exception {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ case AT_LEAST_ONCE:
+ flush(transaction);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException("Not
implemented semantic");
+ }
+ checkErroneous();
+ }
+
+ @Override
+ protected void commit(KafkaTransactionState transaction) {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ transaction.producer.commitTransaction();
+ producersPool.add(transaction.producer);
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException("Not
implemented semantic");
+ }
+ }
+
+ @Override
+ protected void recoverAndCommit(KafkaTransactionState transaction) {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ KafkaTransactionState kafkaTransaction =
transaction;
+ FlinkKafkaProducer<byte[], byte[]> producer =
+
initTransactionalProducer(kafkaTransaction.transactionalId, false);
+
producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
+ try {
+ producer.commitTransaction();
+ producer.close();
+ }
+ catch (InvalidTxnStateException ex) {
+ // That means we have committed this
transaction before.
+ LOG.warn("Encountered error [%s] while
recovering transaction [%s]. " +
+ "Presumably this transaction
has been already committed before",
+ ex,
+ transaction);
+ }
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException("Not
implemented semantic");
+ }
+ }
+
+ @Override
+ protected void abort(KafkaTransactionState transaction) {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ transaction.producer.abortTransaction();
+ producersPool.add(transaction.producer);
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ producersPool.add(transaction.producer);
+ break;
+ default:
+ throw new UnsupportedOperationException("Not
implemented semantic");
+ }
+ }
+
+ @Override
+ protected void recoverAndAbort(KafkaTransactionState transaction) {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ FlinkKafkaProducer<byte[], byte[]> producer =
+
initTransactionalProducer(transaction.transactionalId, false);
+
producer.resumeTransaction(transaction.producerId, transaction.epoch);
+ producer.abortTransaction();
+ producer.close();
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException("Not
implemented semantic");
+ }
+ }
+
+ private void acknowledgeMessage() {
+ pendingRecords.decrementAndGet();
+ }
+
+ /**
+ * Flush pending records.
+ * @param transaction
+ */
+ private void flush(KafkaTransactionState transaction) throws Exception {
+ if (transaction.producer != null) {
+ transaction.producer.flush();
+ }
+ long pendingRecordsCount = pendingRecords.get();
+ if (pendingRecordsCount != 0) {
+ throw new IllegalStateException("Pending record count
must be zero at this point: " + pendingRecordsCount);
+ }
+
+ // if the flushed requests has errors, we should propagate it
also and fail the checkpoint
+ checkErroneous();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+
+ transactionalIdsState.clear();
+ for (String transactionalId : availableTransactionalIds) {
+ transactionalIdsState.add(transactionalId);
+ }
+ for (String transactionalId : usedTransactionalIds) {
+ transactionalIdsState.add(transactionalId);
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ availableTransactionalIds.clear();
+ for (int i = 0; i < kafkaProducersPoolSize; i++) {
+
availableTransactionalIds.add(UUID.randomUUID().toString());
--- End diff --
> we wouldn't know from which id we can start.
Not sure if you need 'start id'. You can just abort all of them whether
they are any open transactions or not (in fact if you open a new producer with
the id, Kafka aborts any that are open). This is mainly a for clarification,
will leave it to you guys to decide on specifics.
> Add Apache Kafka 0.11 connector
> -------------------------------
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions.
> Thanks to that, Flink might be able to implement Kafka sink supporting
> "exactly-once" semantic. API changes and whole transactions support is
> described in
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New
> FlinkKafkaProducer011 would
> * upon creation begin transaction, store transaction identifiers into the
> state and would write all incoming data to an output Kafka topic using that
> transaction
> * on `snapshotState` call, it would flush the data and write in state
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we
> either abort this pending transaction (if not every participant successfully
> saved the snapshot) or restore and commit it.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)