[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136435#comment-16136435 ]
ASF GitHub Bot commented on FLINK-6988: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134398934 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -0,0 +1,1000 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer + * will use {@link Semantic#EXACTLY_ONCE} semantic. + * + * <p>Implementation note: This producer is a hybrid between a regular regular + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) and a custom operator (b). + * + * <p>Details about approach (a): + * Because of regular {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations, this + * variant do not allow accessing the timestamp attached to the record. + * + * <p>Details about approach (b): + * Kafka 0.11 supports writing the timestamp attached to a record to Kafka. When using the + * {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the Kafka producer can access the internal + * record timestamp of the record and write it to Kafka. + * + * <p>All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer011<IN> + extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState> { + + /** + * Semantics that can be chosen. + * <li>{@link #EXACTLY_ONCE}</li> + * <li>{@link #AT_LEAST_ONCE}</li> + * <li>{@link #NONE}</li> + */ + public enum Semantic { + /** + * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be + * committed to the Kafka on a checkpoint. + * + * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}. Between each + * checkpoint there is created new Kafka transaction, which is being committed on + * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete notifications are + * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s in the pool. In that + * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail + * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer} from previous checkpoint. + * To decrease chances of failing checkpoints there are three options: + * <li>decrease number of max concurrent checkpoints</li> + * <li>make checkpoints more reliable (so that they complete faster)</li> + * <li>increase delay between checkpoints</li> + * <li>increase size of {@link FlinkKafkaProducer}s pool</li> + */ + EXACTLY_ONCE, + /** + * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + */ + AT_LEAST_ONCE, + /** + * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case + * of failure. + */ + NONE + } + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); + + private static final long serialVersionUID = 1L; + + /** + * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}. + */ + public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; + + /** + * Configuration key for disabling the metrics reporting. + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Descriptor of the transacionalIds list. + */ + private static final ListStateDescriptor<String> TRANSACTIONAL_IDS_DESCRIPTOR = + new ListStateDescriptor<>("transactional-ids", TypeInformation.of(String.class)); + + /** + * Pool of transacional ids backed up in state. + */ + private ListState<String> transactionalIdsState; + + /** + * Already used transactional ids. + */ + private final Set<String> usedTransactionalIds = new HashSet<>(); + + /** + * Available to use transactional ids. + */ + private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>(); + + /** + * User defined properties for the Producer. + */ + private final Properties producerConfig; + + /** + * The name of the default topic this producer is writing data to. + */ + private final String defaultTopicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into. + * byte[] for Kafka. + */ + private final KeyedSerializationSchema<IN> schema; + + /** + * User-provided partitioner for assigning an object to a Kafka partition for each topic. + */ + private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner; + + /** + * Partitions of each topic. + */ + private final Map<String, int[]> topicPartitionsMap; + + /** + * Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception. + */ + private final int kafkaProducersPoolSize; + + /** + * Flag controlling whether we are writing the Flink record's timestamp into Kafka. + */ + private boolean writeTimestampToKafka = false; + + /** + * Flag indicating whether to accept failures (and log them), or to fail on failures. + */ + private boolean logFailuresOnly; + + /** + * Semantic chosen for this instance. + */ + private Semantic semantic; + + /** + * Pool of KafkaProducers objects. + */ + private transient ProducersPool producersPool = new ProducersPool(); + + // -------------------------------- Runtime fields ------------------------------------------ + + /** The callback than handles error propagation or logging callbacks. */ + @Nullable + private transient Callback callback; + + /** Errors encountered in the async producer are stored here. */ + @Nullable + private transient volatile Exception asyncException; + + /** Lock for accessing the pending records. */ + private final SerializableObject pendingRecordsLock = new SerializableObject(); + + /** Number of unacknowledged records. */ + private final AtomicLong pendingRecords = new AtomicLong(); + + /** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */ + private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>(); + + // ---------------------- "Constructors" for timestamp writing ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + * @param producerConfig Properties with the producer configuration. + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + SerializationSchema<IN> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only + * required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<IN> customPartitioner) { + return writeToKafkaWithTimestamps( + inStream, + topicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.EXACTLY_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only + * required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<IN> customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer011<IN> kafkaProducer = + new FlinkKafkaProducer011<>( + topicId, + serializationSchema, + producerConfig, + customPartitioner, + semantic, + kafkaProducersPoolSize); + KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer); + SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink); + return new FlinkKafkaProducer011Configuration<>(transformation, streamSink); + } + + // ---------------------- Regular constructors w/o timestamp support ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + */ + public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { + this( + defaultTopicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.EXACTLY_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer011( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<IN> customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + super( + TypeInformation.of(KafkaTransactionState.class), + TypeInformation.of(new TypeHint<List<KafkaTransactionState>>() {})); + + requireNonNull(defaultTopicId, "TopicID not set"); + requireNonNull(serializationSchema, "serializationSchema not set"); + requireNonNull(producerConfig, "producerConfig not set"); + ClosureCleaner.clean(customPartitioner, true); + ClosureCleaner.ensureSerializable(serializationSchema); + + this.defaultTopicId = defaultTopicId; + this.schema = serializationSchema; + this.producerConfig = producerConfig; + this.flinkKafkaPartitioner = customPartitioner; + this.semantic = semantic; + this.kafkaProducersPoolSize = kafkaProducersPoolSize; + + // set the producer configuration properties for kafka record key value serializers. + if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + // eagerly ensure that bootstrap servers are set. + if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); + } + + this.topicPartitionsMap = new HashMap<>(); + } + + // ---------------------------------- Properties -------------------------- + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * <p>Method is only accessible for approach (a) (see above) + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + // ----------------------------------- Utilities -------------------------- + + /** + * Initializes the connection to Kafka. + * + * <p>This method is used for approach (a) (see above). + */ + @Override + public void open(Configuration configuration) throws Exception { + if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn(String.format("Using [%s] semantic, but checkpointing is not enabled. Switching to [%s] semantic.", semantic, Semantic.NONE)); + semantic = Semantic.NONE; + } + + if (logFailuresOnly) { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); + } + acknowledgeMessage(); + } + }; + } + else { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null && asyncException == null) { + asyncException = exception; + } + acknowledgeMessage(); + } + }; + } + + super.open(configuration); + } + + @Override + public void invoke(KafkaTransactionState transaction, IN next) throws Exception { + invokeInternal(transaction, next, Long.MAX_VALUE); + } + + private void invokeInternal(KafkaTransactionState transaction, IN next, long elementTimestamp) throws Exception { + checkErroneous(); + + byte[] serializedKey = schema.serializeKey(next); + byte[] serializedValue = schema.serializeValue(next); + String targetTopic = schema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + + Long timestamp = null; + if (this.writeTimestampToKafka) { + timestamp = elementTimestamp; + } + + ProducerRecord<byte[], byte[]> record; + int[] partitions = topicPartitionsMap.get(targetTopic); + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + if (flinkKafkaPartitioner == null) { + record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); + } + pendingRecords.incrementAndGet(); + transaction.producer.send(record, callback); + } + + @Override + public void close() throws Exception { + if (currentTransaction != null) { + // to avoid exceptions on aborting transactions with some pending records + flush(currentTransaction); + } + try { + super.close(); + } + catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } + try { + producersPool.close(); + } + catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } + // make sure we propagate pending errors + checkErroneous(); + } + + // ------------------- Logic for handling checkpoint flushing -------------------------- // + + @Override + protected KafkaTransactionState beginTransaction() throws Exception { + switch (semantic) { + case EXACTLY_ONCE: + FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll(); + if (producer == null) { + String transactionalId = availableTransactionalIds.poll(); + if (transactionalId == null) { + throw new Exception( + "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins."); + } + usedTransactionalIds.add(transactionalId); + producer = initTransactionalProducer(transactionalId, true); + producer.initTransactions(); + } + producer.beginTransaction(); + return new KafkaTransactionState(producer.getTransactionalId(), producer); + case AT_LEAST_ONCE: + case NONE: + // Do not create new producer on each beginTransaction() if it is not necessary + if (currentTransaction != null && currentTransaction.producer != null) { + return new KafkaTransactionState(currentTransaction.producer); + } + return new KafkaTransactionState(initProducer(true)); + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void preCommit(KafkaTransactionState transaction) throws Exception { + switch (semantic) { + case EXACTLY_ONCE: + case AT_LEAST_ONCE: + flush(transaction); + break; + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + checkErroneous(); + } + + @Override + protected void commit(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + transaction.producer.commitTransaction(); + producersPool.add(transaction.producer); + break; + case AT_LEAST_ONCE: + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void recoverAndCommit(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + KafkaTransactionState kafkaTransaction = transaction; + FlinkKafkaProducer<byte[], byte[]> producer = + initTransactionalProducer(kafkaTransaction.transactionalId, false); + producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch); + try { + producer.commitTransaction(); + producer.close(); + } + catch (InvalidTxnStateException ex) { + // That means we have committed this transaction before. + LOG.warn("Encountered error [%s] while recovering transaction [%s]. " + + "Presumably this transaction has been already committed before", + ex, + transaction); + } + break; + case AT_LEAST_ONCE: + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void abort(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + transaction.producer.abortTransaction(); + producersPool.add(transaction.producer); + break; + case AT_LEAST_ONCE: + case NONE: + producersPool.add(transaction.producer); + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void recoverAndAbort(KafkaTransactionState transaction) { + switch (semantic) { + case EXACTLY_ONCE: + FlinkKafkaProducer<byte[], byte[]> producer = + initTransactionalProducer(transaction.transactionalId, false); + producer.resumeTransaction(transaction.producerId, transaction.epoch); + producer.abortTransaction(); + producer.close(); + break; + case AT_LEAST_ONCE: + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + private void acknowledgeMessage() { + pendingRecords.decrementAndGet(); + } + + /** + * Flush pending records. + * @param transaction + */ + private void flush(KafkaTransactionState transaction) throws Exception { + if (transaction.producer != null) { + transaction.producer.flush(); + } + long pendingRecordsCount = pendingRecords.get(); + if (pendingRecordsCount != 0) { + throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount); + } + + // if the flushed requests has errors, we should propagate it also and fail the checkpoint + checkErroneous(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + super.snapshotState(context); + + transactionalIdsState.clear(); + for (String transactionalId : availableTransactionalIds) { + transactionalIdsState.add(transactionalId); + } + for (String transactionalId : usedTransactionalIds) { + transactionalIdsState.add(transactionalId); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + availableTransactionalIds.clear(); + for (int i = 0; i < kafkaProducersPoolSize; i++) { + availableTransactionalIds.add(UUID.randomUUID().toString()); + } + + super.initializeState(context); + + transactionalIdsState = context.getOperatorStateStore().getListState(TRANSACTIONAL_IDS_DESCRIPTOR); + abortPreviousTransactions(transactionalIdsState.get()); + } + + private void abortPreviousTransactions(Iterable<String> transactionalIds) { + for (String transactionalid : transactionalIds) { + try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer = + initTransactionalProducer(transactionalid, false)) { + kafkaProducer.initTransactions(); + } + } + } + + // ----------------------------------- Utilities -------------------------- + + int getTransactionCoordinatorId() { + if (currentTransaction == null || currentTransaction.producer == null) { + throw new IllegalArgumentException(); + } + return currentTransaction.producer.getTransactionCoordinatorId(); + } + + private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) { + producerConfig.put("transactional.id", transactionalId); + return initProducer(registerMetrics); + } + + private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) { + FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig); + + RuntimeContext ctx = getRuntimeContext(); + + if (null != flinkKafkaPartitioner) { + if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) { + ((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions( + getPartitionsByTopic(this.defaultTopicId, producer)); + } + flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}", + ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId); + + // register Kafka metrics to Flink accumulators + if (registerMetrics && !Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + Map<MetricName, ? extends Metric> metrics = producer.metrics(); + + if (metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Producer implementation does not support metrics"); + } else { + final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer"); + for (Map.Entry<MetricName, ? extends Metric> entry: metrics.entrySet()) { + String name = entry.getKey().name(); + Metric metric = entry.getValue(); + + KafkaMetricMuttableWrapper wrapper = previouslyCreatedMetrics.get(name); + if (wrapper != null) { + wrapper.setKafkaMetric(metric); + } else { + // TODO: somehow merge metrics from all active producers? + wrapper = new KafkaMetricMuttableWrapper(metric); + previouslyCreatedMetrics.put(name, wrapper); + kafkaMetricGroup.gauge(name, wrapper); + } + } + } + } + return producer; + } + + private void checkErroneous() throws Exception { + Exception e = asyncException; + if (e != null) { + // prevent double throwing + asyncException = null; + throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); + } + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + producersPool = new ProducersPool(); + } + + private static Properties getPropertiesFromBrokerList(String brokerList) { + String[] elements = brokerList.split(","); + + // validate the broker addresses + for (String broker: elements) { + NetUtils.getCorrectHostnamePort(broker); + } + + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + return props; + } + + private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator<PartitionInfo>() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + int[] partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + + return partitions; + } + + /** + * Configuration object returned by the writeToKafkaWithTimestamps() call. + */ + public static class FlinkKafkaProducer011Configuration<IN> extends DataStreamSink<IN> { + + private final FlinkKafkaProducer011 producer; + + private FlinkKafkaProducer011Configuration(DataStream stream, KafkaStreamSink streamSink) { + //noinspection unchecked + super(stream, streamSink); + this.producer = streamSink.kafkaProducer; + } + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.producer.setLogFailuresOnly(logFailuresOnly); + } + + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. + * Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.producer.writeTimestampToKafka = writeTimestampToKafka; + } + } + + /** + * State for handling transactions. + */ + public static class KafkaTransactionState implements Serializable { + + private final transient FlinkKafkaProducer<byte[], byte[]> producer; + + @Nullable + public final String transactionalId; + + public final long producerId; + + public final short epoch; + + public KafkaTransactionState(String transactionalId, FlinkKafkaProducer<byte[], byte[]> producer) { + this.producer = producer; + this.transactionalId = transactionalId; + this.producerId = producer.getProducerId(); + this.epoch = producer.getEpoch(); + } + + public KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> producer) { + this.producer = producer; + this.transactionalId = null; + this.producerId = -1; + this.epoch = -1; + } + } + + private static class KafkaStreamSink<IN> extends StreamSink<IN> { + private final FlinkKafkaProducer011<IN> kafkaProducer; + + public KafkaStreamSink(FlinkKafkaProducer011<IN> kafkaProducer) { + super(kafkaProducer); + this.kafkaProducer = kafkaProducer; + } + + // TODO: is this used anywhere? --- End diff -- I think this variant is used when using `writeToKafkaWithTimestamps` > Add Apache Kafka 0.11 connector > ------------------------------- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)