[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136435#comment-16136435
 ] 

ASF GitHub Bot commented on FLINK-6988:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4239#discussion_r134398934
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
    @@ -0,0 +1,1000 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.util.SerializableObject;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
    +import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
    +import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
    +import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
    +import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.errors.InvalidTxnStateException;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.BlockingDeque;
    +import java.util.concurrent.LinkedBlockingDeque;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
    + * will use {@link Semantic#EXACTLY_ONCE} semantic.
    + *
    + * <p>Implementation note: This producer is a hybrid between a regular 
regular
    + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) 
and a custom operator (b).
    + *
    + * <p>Details about approach (a):
    + *  Because of regular {@link 
org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations, 
this
    + *  variant do not allow accessing the timestamp attached to the record.
    + *
    + * <p>Details about approach (b):
    + *  Kafka 0.11 supports writing the timestamp attached to a record to 
Kafka. When using the
    + *  {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the 
Kafka producer can access the internal
    + *  record timestamp of the record and write it to Kafka.
    + *
    + * <p>All methods and constructors in this class are marked with the 
approach they are needed for.
    + */
    +public class FlinkKafkaProducer011<IN>
    +           extends TwoPhaseCommitSinkFunction<IN, 
FlinkKafkaProducer011.KafkaTransactionState> {
    +
    +   /**
    +    *  Semantics that can be chosen.
    +    *  <li>{@link #EXACTLY_ONCE}</li>
    +    *  <li>{@link #AT_LEAST_ONCE}</li>
    +    *  <li>{@link #NONE}</li>
    +    */
    +   public enum Semantic {
    +           /**
    +            * Semantic.EXACTLY_ONCE the Flink producer will write all 
messages in a Kafka transaction that will be
    +            * committed to the Kafka on a checkpoint.
    +            *
    +            * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool 
of {@link FlinkKafkaProducer}. Between each
    +            * checkpoint there is created new Kafka transaction, which is 
being committed on
    +            * {@link 
FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete 
notifications are
    +            * running late, {@link FlinkKafkaProducer011} can run out of 
{@link FlinkKafkaProducer}s in the pool. In that
    +            * case any subsequent {@link 
FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
    +            * and {@link FlinkKafkaProducer011} will keep using the {@link 
FlinkKafkaProducer} from previous checkpoint.
    +            * To decrease chances of failing checkpoints there are three 
options:
    +            * <li>decrease number of max concurrent checkpoints</li>
    +            * <li>make checkpoints more reliable (so that they complete 
faster)</li>
    +            * <li>increase delay between checkpoints</li>
    +            * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
    +            */
    +           EXACTLY_ONCE,
    +           /**
    +            * Semantic.AT_LEAST_ONCE the Flink producer will wait for all 
outstanding messages in the Kafka buffers
    +            * to be acknowledged by the Kafka producer on a checkpoint.
    +            */
    +           AT_LEAST_ONCE,
    +           /**
    +            * Semantic.NONE means that nothing will be guaranteed. 
Messages can be lost and/or duplicated in case
    +            * of failure.
    +            */
    +           NONE
    +   }
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   /**
    +    * Default number of KafkaProducers in the pool. See {@link 
Semantic#EXACTLY_ONCE}.
    +    */
    +   public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
    +
    +   /**
    +    * Configuration key for disabling the metrics reporting.
    +    */
    +   public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
    +
    +   /**
    +    * Descriptor of the transacionalIds list.
    +    */
    +   private static final ListStateDescriptor<String> 
TRANSACTIONAL_IDS_DESCRIPTOR =
    +           new ListStateDescriptor<>("transactional-ids", 
TypeInformation.of(String.class));
    +
    +   /**
    +    * Pool of transacional ids backed up in state.
    +    */
    +   private ListState<String> transactionalIdsState;
    +
    +   /**
    +    * Already used transactional ids.
    +    */
    +   private final Set<String> usedTransactionalIds = new HashSet<>();
    +
    +   /**
    +    * Available to use transactional ids.
    +    */
    +   private final BlockingDeque<String> availableTransactionalIds = new 
LinkedBlockingDeque<>();
    +
    +   /**
    +    * User defined properties for the Producer.
    +    */
    +   private final Properties producerConfig;
    +
    +   /**
    +    * The name of the default topic this producer is writing data to.
    +    */
    +   private final String defaultTopicId;
    +
    +   /**
    +    * (Serializable) SerializationSchema for turning objects used with 
Flink into.
    +    * byte[] for Kafka.
    +    */
    +   private final KeyedSerializationSchema<IN> schema;
    +
    +   /**
    +    * User-provided partitioner for assigning an object to a Kafka 
partition for each topic.
    +    */
    +   private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    +
    +   /**
    +    * Partitions of each topic.
    +    */
    +   private final Map<String, int[]> topicPartitionsMap;
    +
    +   /**
    +    * Max number of producers in the pool. If all producers are in use, 
snapshoting state will throw an exception.
    +    */
    +   private final int kafkaProducersPoolSize;
    +
    +   /**
    +    * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
    +    */
    +   private boolean writeTimestampToKafka = false;
    +
    +   /**
    +    * Flag indicating whether to accept failures (and log them), or to 
fail on failures.
    +    */
    +   private boolean logFailuresOnly;
    +
    +   /**
    +    * Semantic chosen for this instance.
    +    */
    +   private Semantic semantic;
    +
    +   /**
    +    * Pool of KafkaProducers objects.
    +    */
    +   private transient ProducersPool producersPool = new ProducersPool();
    +
    +   // -------------------------------- Runtime fields 
------------------------------------------
    +
    +   /** The callback than handles error propagation or logging callbacks. */
    +   @Nullable
    +   private transient Callback callback;
    +
    +   /** Errors encountered in the async producer are stored here. */
    +   @Nullable
    +   private transient volatile Exception asyncException;
    +
    +   /** Lock for accessing the pending records. */
    +   private final SerializableObject pendingRecordsLock = new 
SerializableObject();
    +
    +   /** Number of unacknowledged records. */
    +   private final AtomicLong pendingRecords = new AtomicLong();
    +
    +   /** Cache of metrics to replace already registered metrics instead of 
overwriting existing ones. */
    +   private final Map<String, KafkaMetricMuttableWrapper> 
previouslyCreatedMetrics = new HashMap<>();
    +
    +   // ---------------------- "Constructors" for timestamp writing 
------------------
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    * @param inStream The stream to write to Kafka
    +    * @param topicId ID of the Kafka topic.
    +    * @param serializationSchema User defined serialization schema 
supporting key/value messages
    +    * @param producerConfig Properties with the producer configuration.
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +                                                                           
                                                                                
        String topicId,
    +                                                                           
                                                                                
        KeyedSerializationSchema<IN> serializationSchema,
    +                                                                           
                                                                                
        Properties producerConfig) {
    +           return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
    +   }
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    * @param inStream The stream to write to Kafka
    +    * @param topicId ID of the Kafka topic.
    +    * @param serializationSchema User defined (keyless) serialization 
schema.
    +    * @param producerConfig Properties with the producer configuration.
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +                                                                           
                                                                                
        String topicId,
    +                                                                           
                                                                                
        SerializationSchema<IN> serializationSchema,
    +                                                                           
                                                                                
        Properties producerConfig) {
    +           return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<IN>());
    +   }
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    *  @param inStream The stream to write to Kafka
    +    *  @param topicId The name of the target topic
    +    *  @param serializationSchema A serializable serialization schema for 
turning user objects into a
    +    *                             kafka-consumable byte[] supporting 
key/value messages
    +    *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only
    +    *                        required argument.
    +    *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +                                                                           
                                                                                
        String topicId,
    +                                                                           
                                                                                
        KeyedSerializationSchema<IN> serializationSchema,
    +                                                                           
                                                                                
        Properties producerConfig,
    +                                                                           
                                                                                
        FlinkKafkaPartitioner<IN> customPartitioner) {
    +           return writeToKafkaWithTimestamps(
    +                   inStream,
    +                   topicId,
    +                   serializationSchema,
    +                   producerConfig,
    +                   customPartitioner,
    +                   Semantic.EXACTLY_ONCE,
    +                   DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +   }
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    *  @param inStream The stream to write to Kafka
    +    *  @param topicId The name of the target topic
    +    *  @param serializationSchema A serializable serialization schema for 
turning user objects into a
    +    *                             kafka-consumable byte[] supporting 
key/value messages
    +    *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only
    +    *                        required argument.
    +    *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
    +    *  @param semantic Defines semantic that will be used by this producer 
(see {@link Semantic}).
    +    *  @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +                                                                           
                                                                                
        String topicId,
    +                                                                           
                                                                                
        KeyedSerializationSchema<IN> serializationSchema,
    +                                                                           
                                                                                
        Properties producerConfig,
    +                                                                           
                                                                                
        FlinkKafkaPartitioner<IN> customPartitioner,
    +                                                                           
                                                                                
        Semantic semantic,
    +                                                                           
                                                                                
        int kafkaProducersPoolSize) {
    +
    +           GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
    +           FlinkKafkaProducer011<IN> kafkaProducer =
    +                   new FlinkKafkaProducer011<>(
    +                           topicId,
    +                           serializationSchema,
    +                           producerConfig,
    +                           customPartitioner,
    +                           semantic,
    +                           kafkaProducersPoolSize);
    +           KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer);
    +           SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink);
    +           return new FlinkKafkaProducer011Configuration<>(transformation, 
streamSink);
    +   }
    +
    +   // ---------------------- Regular constructors w/o timestamp support  
------------------
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param brokerList
    +    *                      Comma separated addresses of the brokers
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined (keyless) serialization schema.
    +    */
    +   public FlinkKafkaProducer011(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
    +           this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +   }
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined (keyless) serialization schema.
    +    * @param producerConfig
    +    *                      Properties with the producer configuration.
    +    */
    +   public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
    +           this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<IN>());
    +   }
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param topicId The topic to write data to
    +    * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
    +    * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +    * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
    +    */
    +   public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
    +           this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
    +   }
    +
    +   // ------------------- Key/Value serialization schema constructors 
----------------------
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param brokerList
    +    *                      Comma separated addresses of the brokers
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined serialization schema supporting 
key/value messages
    +    */
    +   public FlinkKafkaProducer011(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
    +           this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +   }
    +
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined serialization schema supporting 
key/value messages
    +    * @param producerConfig
    +    *                      Properties with the producer configuration.
    +    */
    +   public FlinkKafkaProducer011(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
    +           this(topicId, serializationSchema, producerConfig, new 
FlinkFixedPartitioner<IN>());
    +   }
    +
    +   /**
    +    * The main constructor for creating a FlinkKafkaProducer.
    +    *
    +    * <p>This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
    +    *
    +    * @param defaultTopicId The default topic to write data to
    +    * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
    +    * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +    * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
    +    */
    +   public FlinkKafkaProducer011(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
    +           this(
    +                   defaultTopicId,
    +                   serializationSchema,
    +                   producerConfig,
    +                   customPartitioner,
    +                   Semantic.EXACTLY_ONCE,
    +                   DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +   }
    +
    +   /**
    +    * The main constructor for creating a FlinkKafkaProducer.
    +    *
    +    * <p>This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
    +    *
    +    * @param defaultTopicId The default topic to write data to
    +    * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
    +    * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +    * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
    +    * @param semantic Defines semantic that will be used by this producer 
(see {@link Semantic}).
    +    * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
    +    */
    +   public FlinkKafkaProducer011(
    +                   String defaultTopicId,
    +                   KeyedSerializationSchema<IN> serializationSchema,
    +                   Properties producerConfig,
    +                   FlinkKafkaPartitioner<IN> customPartitioner,
    +                   Semantic semantic,
    +                   int kafkaProducersPoolSize) {
    +           super(
    +                   TypeInformation.of(KafkaTransactionState.class),
    +                   TypeInformation.of(new 
TypeHint<List<KafkaTransactionState>>() {}));
    +
    +           requireNonNull(defaultTopicId, "TopicID not set");
    +           requireNonNull(serializationSchema, "serializationSchema not 
set");
    +           requireNonNull(producerConfig, "producerConfig not set");
    +           ClosureCleaner.clean(customPartitioner, true);
    +           ClosureCleaner.ensureSerializable(serializationSchema);
    +
    +           this.defaultTopicId = defaultTopicId;
    +           this.schema = serializationSchema;
    +           this.producerConfig = producerConfig;
    +           this.flinkKafkaPartitioner = customPartitioner;
    +           this.semantic = semantic;
    +           this.kafkaProducersPoolSize = kafkaProducersPoolSize;
    +
    +           // set the producer configuration properties for kafka record 
key value serializers.
    +           if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
    +                   
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
    +           } else {
    +                   LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
    +           }
    +
    +           if 
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +                   
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
    +           } else {
    +                   LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
    +           }
    +
    +           // eagerly ensure that bootstrap servers are set.
    +           if 
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
    +                   throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
    +           }
    +
    +           this.topicPartitionsMap = new HashMap<>();
    +   }
    +
    +   // ---------------------------------- Properties 
--------------------------
    +
    +   /**
    +    * Defines whether the producer should fail on errors, or only log them.
    +    * If this is set to true, then exceptions will be only logged, if set 
to false,
    +    * exceptions will be eventually thrown and cause the streaming program 
to
    +    * fail (and enter recovery).
    +    *
    +    * <p>Method is only accessible for approach (a) (see above)
    +    *
    +    * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
    +    */
    +   public void setLogFailuresOnly(boolean logFailuresOnly) {
    +           this.logFailuresOnly = logFailuresOnly;
    +   }
    +
    +   // ----------------------------------- Utilities 
--------------------------
    +
    +   /**
    +    * Initializes the connection to Kafka.
    +    *
    +    * <p>This method is used for approach (a) (see above).
    +    */
    +   @Override
    +   public void open(Configuration configuration) throws Exception {
    +           if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
    +                   LOG.warn(String.format("Using [%s] semantic, but 
checkpointing is not enabled. Switching to [%s] semantic.", semantic, 
Semantic.NONE));
    +                   semantic = Semantic.NONE;
    +           }
    +
    +           if (logFailuresOnly) {
    +                   callback = new Callback() {
    +                           @Override
    +                           public void onCompletion(RecordMetadata 
metadata, Exception e) {
    +                                   if (e != null) {
    +                                           LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
    +                                   }
    +                                   acknowledgeMessage();
    +                           }
    +                   };
    +           }
    +           else {
    +                   callback = new Callback() {
    +                           @Override
    +                           public void onCompletion(RecordMetadata 
metadata, Exception exception) {
    +                                   if (exception != null && asyncException 
== null) {
    +                                           asyncException = exception;
    +                                   }
    +                                   acknowledgeMessage();
    +                           }
    +                   };
    +           }
    +
    +           super.open(configuration);
    +   }
    +
    +   @Override
    +   public void invoke(KafkaTransactionState transaction, IN next) throws 
Exception {
    +           invokeInternal(transaction, next, Long.MAX_VALUE);
    +   }
    +
    +   private void invokeInternal(KafkaTransactionState transaction, IN next, 
long elementTimestamp) throws Exception {
    +           checkErroneous();
    +
    +           byte[] serializedKey = schema.serializeKey(next);
    +           byte[] serializedValue = schema.serializeValue(next);
    +           String targetTopic = schema.getTargetTopic(next);
    +           if (targetTopic == null) {
    +                   targetTopic = defaultTopicId;
    +           }
    +
    +           Long timestamp = null;
    +           if (this.writeTimestampToKafka) {
    +                   timestamp = elementTimestamp;
    +           }
    +
    +           ProducerRecord<byte[], byte[]> record;
    +           int[] partitions = topicPartitionsMap.get(targetTopic);
    +           if (null == partitions) {
    +                   partitions = getPartitionsByTopic(targetTopic, 
transaction.producer);
    +                   topicPartitionsMap.put(targetTopic, partitions);
    +           }
    +           if (flinkKafkaPartitioner == null) {
    +                   record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
    +           } else {
    +                   record = new ProducerRecord<>(targetTopic, 
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, 
targetTopic, partitions), timestamp, serializedKey, serializedValue);
    +           }
    +           pendingRecords.incrementAndGet();
    +           transaction.producer.send(record, callback);
    +   }
    +
    +   @Override
    +   public void close() throws Exception {
    +           if (currentTransaction != null) {
    +                   // to avoid exceptions on aborting transactions with 
some pending records
    +                   flush(currentTransaction);
    +           }
    +           try {
    +                   super.close();
    +           }
    +           catch (Exception e) {
    +                   asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
    +           }
    +           try {
    +                   producersPool.close();
    +           }
    +           catch (Exception e) {
    +                   asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
    +           }
    +           // make sure we propagate pending errors
    +           checkErroneous();
    +   }
    +
    +   // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
    +
    +   @Override
    +   protected KafkaTransactionState beginTransaction() throws Exception {
    +           switch (semantic) {
    +                   case EXACTLY_ONCE:
    +                           FlinkKafkaProducer<byte[], byte[]> producer = 
producersPool.poll();
    +                           if (producer == null) {
    +                                   String transactionalId = 
availableTransactionalIds.poll();
    +                                   if (transactionalId == null) {
    +                                           throw new Exception(
    +                                                   "Too many ongoing 
snapshots. Increase kafka producers pool size or decrease number of concurrent 
checktpoins.");
    +                                   }
    +                                   
usedTransactionalIds.add(transactionalId);
    +                                   producer = 
initTransactionalProducer(transactionalId, true);
    +                                   producer.initTransactions();
    +                           }
    +                           producer.beginTransaction();
    +                           return new 
KafkaTransactionState(producer.getTransactionalId(), producer);
    +                   case AT_LEAST_ONCE:
    +                   case NONE:
    +                           // Do not create new producer on each 
beginTransaction() if it is not necessary
    +                           if (currentTransaction != null && 
currentTransaction.producer != null) {
    +                                   return new 
KafkaTransactionState(currentTransaction.producer);
    +                           }
    +                           return new 
KafkaTransactionState(initProducer(true));
    +                   default:
    +                           throw new UnsupportedOperationException("Not 
implemented semantic");
    +           }
    +   }
    +
    +   @Override
    +   protected void preCommit(KafkaTransactionState transaction) throws 
Exception {
    +           switch (semantic) {
    +                   case EXACTLY_ONCE:
    +                   case AT_LEAST_ONCE:
    +                           flush(transaction);
    +                           break;
    +                   case NONE:
    +                           break;
    +                   default:
    +                           throw new UnsupportedOperationException("Not 
implemented semantic");
    +           }
    +           checkErroneous();
    +   }
    +
    +   @Override
    +   protected void commit(KafkaTransactionState transaction) {
    +           switch (semantic) {
    +                   case EXACTLY_ONCE:
    +                           transaction.producer.commitTransaction();
    +                           producersPool.add(transaction.producer);
    +                           break;
    +                   case AT_LEAST_ONCE:
    +                   case NONE:
    +                           break;
    +                   default:
    +                           throw new UnsupportedOperationException("Not 
implemented semantic");
    +           }
    +   }
    +
    +   @Override
    +   protected void recoverAndCommit(KafkaTransactionState transaction) {
    +           switch (semantic) {
    +                   case EXACTLY_ONCE:
    +                           KafkaTransactionState kafkaTransaction = 
transaction;
    +                           FlinkKafkaProducer<byte[], byte[]> producer =
    +                                   
initTransactionalProducer(kafkaTransaction.transactionalId, false);
    +                           
producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
    +                           try {
    +                                   producer.commitTransaction();
    +                                   producer.close();
    +                           }
    +                           catch (InvalidTxnStateException ex) {
    +                                   // That means we have committed this 
transaction before.
    +                                   LOG.warn("Encountered error [%s] while 
recovering transaction [%s]. " +
    +                                           "Presumably this transaction 
has been already committed before",
    +                                           ex,
    +                                           transaction);
    +                           }
    +                           break;
    +                   case AT_LEAST_ONCE:
    +                   case NONE:
    +                           break;
    +                   default:
    +                           throw new UnsupportedOperationException("Not 
implemented semantic");
    +           }
    +   }
    +
    +   @Override
    +   protected void abort(KafkaTransactionState transaction) {
    +           switch (semantic) {
    +                   case EXACTLY_ONCE:
    +                           transaction.producer.abortTransaction();
    +                           producersPool.add(transaction.producer);
    +                           break;
    +                   case AT_LEAST_ONCE:
    +                   case NONE:
    +                           producersPool.add(transaction.producer);
    +                           break;
    +                   default:
    +                           throw new UnsupportedOperationException("Not 
implemented semantic");
    +           }
    +   }
    +
    +   @Override
    +   protected void recoverAndAbort(KafkaTransactionState transaction) {
    +           switch (semantic) {
    +                   case EXACTLY_ONCE:
    +                           FlinkKafkaProducer<byte[], byte[]> producer =
    +                                   
initTransactionalProducer(transaction.transactionalId, false);
    +                           
producer.resumeTransaction(transaction.producerId, transaction.epoch);
    +                           producer.abortTransaction();
    +                           producer.close();
    +                           break;
    +                   case AT_LEAST_ONCE:
    +                   case NONE:
    +                           break;
    +                   default:
    +                           throw new UnsupportedOperationException("Not 
implemented semantic");
    +           }
    +   }
    +
    +   private void acknowledgeMessage() {
    +           pendingRecords.decrementAndGet();
    +   }
    +
    +   /**
    +    * Flush pending records.
    +    * @param transaction
    +    */
    +   private void flush(KafkaTransactionState transaction) throws Exception {
    +           if (transaction.producer != null) {
    +                   transaction.producer.flush();
    +           }
    +           long pendingRecordsCount = pendingRecords.get();
    +           if (pendingRecordsCount != 0) {
    +                   throw new IllegalStateException("Pending record count 
must be zero at this point: " + pendingRecordsCount);
    +           }
    +
    +           // if the flushed requests has errors, we should propagate it 
also and fail the checkpoint
    +           checkErroneous();
    +   }
    +
    +   @Override
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           super.snapshotState(context);
    +
    +           transactionalIdsState.clear();
    +           for (String transactionalId : availableTransactionalIds) {
    +                   transactionalIdsState.add(transactionalId);
    +           }
    +           for (String transactionalId : usedTransactionalIds) {
    +                   transactionalIdsState.add(transactionalId);
    +           }
    +   }
    +
    +   @Override
    +   public void initializeState(FunctionInitializationContext context) 
throws Exception {
    +           availableTransactionalIds.clear();
    +           for (int i = 0; i < kafkaProducersPoolSize; i++) {
    +                   
availableTransactionalIds.add(UUID.randomUUID().toString());
    +           }
    +
    +           super.initializeState(context);
    +
    +           transactionalIdsState = 
context.getOperatorStateStore().getListState(TRANSACTIONAL_IDS_DESCRIPTOR);
    +           abortPreviousTransactions(transactionalIdsState.get());
    +   }
    +
    +   private void abortPreviousTransactions(Iterable<String> 
transactionalIds) {
    +           for (String transactionalid : transactionalIds) {
    +                   try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
    +                                   
initTransactionalProducer(transactionalid, false)) {
    +                           kafkaProducer.initTransactions();
    +                   }
    +           }
    +   }
    +
    +   // ----------------------------------- Utilities 
--------------------------
    +
    +   int getTransactionCoordinatorId() {
    +           if (currentTransaction == null || currentTransaction.producer 
== null) {
    +                   throw new IllegalArgumentException();
    +           }
    +           return 
currentTransaction.producer.getTransactionCoordinatorId();
    +   }
    +
    +   private FlinkKafkaProducer<byte[], byte[]> 
initTransactionalProducer(String transactionalId, boolean registerMetrics) {
    +           producerConfig.put("transactional.id", transactionalId);
    +           return initProducer(registerMetrics);
    +   }
    +
    +   private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean 
registerMetrics) {
    +           FlinkKafkaProducer<byte[], byte[]> producer = new 
FlinkKafkaProducer<>(this.producerConfig);
    +
    +           RuntimeContext ctx = getRuntimeContext();
    +
    +           if (null != flinkKafkaPartitioner) {
    +                   if (flinkKafkaPartitioner instanceof 
FlinkKafkaDelegatePartitioner) {
    +                           ((FlinkKafkaDelegatePartitioner) 
flinkKafkaPartitioner).setPartitions(
    +                                   
getPartitionsByTopic(this.defaultTopicId, producer));
    +                   }
    +                   flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
    +           }
    +
    +           LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
default topic {}",
    +                   ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
    +
    +           // register Kafka metrics to Flink accumulators
    +           if (registerMetrics && 
!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
    +                   Map<MetricName, ? extends Metric> metrics = 
producer.metrics();
    +
    +                   if (metrics == null) {
    +                           // MapR's Kafka implementation returns null 
here.
    +                           LOG.info("Producer implementation does not 
support metrics");
    +                   } else {
    +                           final MetricGroup kafkaMetricGroup = 
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
    +                           for (Map.Entry<MetricName, ? extends Metric> 
entry: metrics.entrySet()) {
    +                                   String name = entry.getKey().name();
    +                                   Metric metric = entry.getValue();
    +
    +                                   KafkaMetricMuttableWrapper wrapper = 
previouslyCreatedMetrics.get(name);
    +                                   if (wrapper != null) {
    +                                           wrapper.setKafkaMetric(metric);
    +                                   } else {
    +                                           // TODO: somehow merge metrics 
from all active producers?
    +                                           wrapper = new 
KafkaMetricMuttableWrapper(metric);
    +                                           
previouslyCreatedMetrics.put(name, wrapper);
    +                                           kafkaMetricGroup.gauge(name, 
wrapper);
    +                                   }
    +                           }
    +                   }
    +           }
    +           return producer;
    +   }
    +
    +   private void checkErroneous() throws Exception {
    +           Exception e = asyncException;
    +           if (e != null) {
    +                   // prevent double throwing
    +                   asyncException = null;
    +                   throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
    +           }
    +   }
    +
    +   private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
    +           in.defaultReadObject();
    +           producersPool = new ProducersPool();
    +   }
    +
    +   private static Properties getPropertiesFromBrokerList(String 
brokerList) {
    +           String[] elements = brokerList.split(",");
    +
    +           // validate the broker addresses
    +           for (String broker: elements) {
    +                   NetUtils.getCorrectHostnamePort(broker);
    +           }
    +
    +           Properties props = new Properties();
    +           props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
    +           return props;
    +   }
    +
    +   private static int[] getPartitionsByTopic(String topic, 
Producer<byte[], byte[]> producer) {
    +           // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
    +           List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(topic));
    +
    +           // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
    +           Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
    +                   @Override
    +                   public int compare(PartitionInfo o1, PartitionInfo o2) {
    +                           return Integer.compare(o1.partition(), 
o2.partition());
    +                   }
    +           });
    +
    +           int[] partitions = new int[partitionsList.size()];
    +           for (int i = 0; i < partitions.length; i++) {
    +                   partitions[i] = partitionsList.get(i).partition();
    +           }
    +
    +           return partitions;
    +   }
    +
    +   /**
    +    * Configuration object returned by the writeToKafkaWithTimestamps() 
call.
    +    */
    +   public static class FlinkKafkaProducer011Configuration<IN> extends 
DataStreamSink<IN> {
    +
    +           private final FlinkKafkaProducer011 producer;
    +
    +           private FlinkKafkaProducer011Configuration(DataStream stream, 
KafkaStreamSink streamSink) {
    +                   //noinspection unchecked
    +                   super(stream, streamSink);
    +                   this.producer = streamSink.kafkaProducer;
    +           }
    +
    +           /**
    +            * Defines whether the producer should fail on errors, or only 
log them.
    +            * If this is set to true, then exceptions will be only logged, 
if set to false,
    +            * exceptions will be eventually thrown and cause the streaming 
program to
    +            * fail (and enter recovery).
    +            *
    +            * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
    +            */
    +           public void setLogFailuresOnly(boolean logFailuresOnly) {
    +                   this.producer.setLogFailuresOnly(logFailuresOnly);
    +           }
    +
    +           /**
    +            * If set to true, Flink will write the (event time) timestamp 
attached to each record into Kafka.
    +            * Timestamps must be positive for Kafka to accept them.
    +            *
    +            * @param writeTimestampToKafka Flag indicating if Flink's 
internal timestamps are written to Kafka.
    +            */
    +           public void setWriteTimestampToKafka(boolean 
writeTimestampToKafka) {
    +                   this.producer.writeTimestampToKafka = 
writeTimestampToKafka;
    +           }
    +   }
    +
    +   /**
    +    * State for handling transactions.
    +    */
    +   public static class KafkaTransactionState implements Serializable {
    +
    +           private final transient FlinkKafkaProducer<byte[], byte[]> 
producer;
    +
    +           @Nullable
    +           public final String transactionalId;
    +
    +           public final long producerId;
    +
    +           public final short epoch;
    +
    +           public KafkaTransactionState(String transactionalId, 
FlinkKafkaProducer<byte[], byte[]> producer) {
    +                   this.producer = producer;
    +                   this.transactionalId = transactionalId;
    +                   this.producerId = producer.getProducerId();
    +                   this.epoch = producer.getEpoch();
    +           }
    +
    +           public KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> 
producer) {
    +                   this.producer = producer;
    +                   this.transactionalId = null;
    +                   this.producerId = -1;
    +                   this.epoch = -1;
    +           }
    +   }
    +
    +   private static class KafkaStreamSink<IN> extends StreamSink<IN> {
    +           private final FlinkKafkaProducer011<IN> kafkaProducer;
    +
    +           public KafkaStreamSink(FlinkKafkaProducer011<IN> kafkaProducer) 
{
    +                   super(kafkaProducer);
    +                   this.kafkaProducer = kafkaProducer;
    +           }
    +
    +           // TODO: is this used anywhere?
    --- End diff --
    
    I think this variant is used when using `writeToKafkaWithTimestamps`


> Add Apache Kafka 0.11 connector
> -------------------------------
>
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to