
ASF GitHub Bot commented on FLINK-6988:

Github user tzulitai commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,1000 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.util.SerializableObject;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.NetUtils;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.errors.InvalidTxnStateException;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import javax.annotation.Nullable;
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.BlockingDeque;
    +import java.util.concurrent.LinkedBlockingDeque;
    +import java.util.concurrent.atomic.AtomicLong;
    +import static java.util.Objects.requireNonNull;
    + * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
    + * will use {@link Semantic#EXACTLY_ONCE} semantic.
    + *
    + * <p>Implementation note: This producer is a hybrid between a regular 
    + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) 
and a custom operator (b).
    + *
    + * <p>Details about approach (a):
    + *  Because of regular {@link 
org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations, 
    + *  variant do not allow accessing the timestamp attached to the record.
    + *
    + * <p>Details about approach (b):
    + *  Kafka 0.11 supports writing the timestamp attached to a record to 
Kafka. When using the
    + *  {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the 
Kafka producer can access the internal
    + *  record timestamp of the record and write it to Kafka.
    + *
    + * <p>All methods and constructors in this class are marked with the 
approach they are needed for.
    + */
    +public class FlinkKafkaProducer011<IN>
    +           extends TwoPhaseCommitSinkFunction<IN, 
FlinkKafkaProducer011.KafkaTransactionState> {
    +   /**
    +    *  Semantics that can be chosen.
    +    *  <li>{@link #EXACTLY_ONCE}</li>
    +    *  <li>{@link #AT_LEAST_ONCE}</li>
    +    *  <li>{@link #NONE}</li>
    +    */
    +   public enum Semantic {
    +           /**
    +            * Semantic.EXACTLY_ONCE the Flink producer will write all 
messages in a Kafka transaction that will be
    +            * committed to the Kafka on a checkpoint.
    +            *
    +            * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool 
of {@link FlinkKafkaProducer}. Between each
    +            * checkpoint there is created new Kafka transaction, which is 
being committed on
    +            * {@link 
FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete 
notifications are
    +            * running late, {@link FlinkKafkaProducer011} can run out of 
{@link FlinkKafkaProducer}s in the pool. In that
    +            * case any subsequent {@link 
FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
    +            * and {@link FlinkKafkaProducer011} will keep using the {@link 
FlinkKafkaProducer} from previous checkpoint.
    +            * To decrease chances of failing checkpoints there are three 
    +            * <li>decrease number of max concurrent checkpoints</li>
    +            * <li>make checkpoints more reliable (so that they complete 
    +            * <li>increase delay between checkpoints</li>
    +            * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
    +            */
    +           EXACTLY_ONCE,
    +           /**
    +            * Semantic.AT_LEAST_ONCE the Flink producer will wait for all 
outstanding messages in the Kafka buffers
    +            * to be acknowledged by the Kafka producer on a checkpoint.
    +            */
    +           AT_LEAST_ONCE,
    +           /**
    +            * Semantic.NONE means that nothing will be guaranteed. 
Messages can be lost and/or duplicated in case
    +            * of failure.
    +            */
    +           NONE
    +   }
    +   private static final Logger LOG = 
    +   private static final long serialVersionUID = 1L;
    +   /**
    +    * Default number of KafkaProducers in the pool. See {@link 
    +    */
    +   public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
    +   /**
    +    * Configuration key for disabling the metrics reporting.
    +    */
    +   public static final String KEY_DISABLE_METRICS = 
    +   /**
    +    * Descriptor of the transacionalIds list.
    +    */
    +   private static final ListStateDescriptor<String> 
    +           new ListStateDescriptor<>("transactional-ids", 
    +   /**
    +    * Pool of transacional ids backed up in state.
    +    */
    +   private ListState<String> transactionalIdsState;
    +   /**
    +    * Already used transactional ids.
    +    */
    +   private final Set<String> usedTransactionalIds = new HashSet<>();
    +   /**
    +    * Available to use transactional ids.
    +    */
    +   private final BlockingDeque<String> availableTransactionalIds = new 
    +   /**
    +    * User defined properties for the Producer.
    +    */
    +   private final Properties producerConfig;
    +   /**
    +    * The name of the default topic this producer is writing data to.
    +    */
    +   private final String defaultTopicId;
    +   /**
    +    * (Serializable) SerializationSchema for turning objects used with 
Flink into.
    +    * byte[] for Kafka.
    +    */
    +   private final KeyedSerializationSchema<IN> schema;
    +   /**
    +    * User-provided partitioner for assigning an object to a Kafka 
partition for each topic.
    +    */
    +   private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    +   /**
    +    * Partitions of each topic.
    +    */
    +   private final Map<String, int[]> topicPartitionsMap;
    +   /**
    +    * Max number of producers in the pool. If all producers are in use, 
snapshoting state will throw an exception.
    +    */
    +   private final int kafkaProducersPoolSize;
    +   /**
    +    * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
    +    */
    +   private boolean writeTimestampToKafka = false;
    +   /**
    +    * Flag indicating whether to accept failures (and log them), or to 
fail on failures.
    +    */
    +   private boolean logFailuresOnly;
    +   /**
    +    * Semantic chosen for this instance.
    +    */
    +   private Semantic semantic;
    +   /**
    +    * Pool of KafkaProducers objects.
    +    */
    +   private transient ProducersPool producersPool = new ProducersPool();
    +   // -------------------------------- Runtime fields 
    +   /** The callback than handles error propagation or logging callbacks. */
    +   @Nullable
    +   private transient Callback callback;
    +   /** Errors encountered in the async producer are stored here. */
    +   @Nullable
    +   private transient volatile Exception asyncException;
    +   /** Lock for accessing the pending records. */
    +   private final SerializableObject pendingRecordsLock = new 
    +   /** Number of unacknowledged records. */
    +   private final AtomicLong pendingRecords = new AtomicLong();
    +   /** Cache of metrics to replace already registered metrics instead of 
overwriting existing ones. */
    +   private final Map<String, KafkaMetricMuttableWrapper> 
previouslyCreatedMetrics = new HashMap<>();
    +   // ---------------------- "Constructors" for timestamp writing 
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    * @param inStream The stream to write to Kafka
    +    * @param topicId ID of the Kafka topic.
    +    * @param serializationSchema User defined serialization schema 
supporting key/value messages
    +    * @param producerConfig Properties with the producer configuration.
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
        String topicId,
        KeyedSerializationSchema<IN> serializationSchema,
        Properties producerConfig) {
    +           return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
    +   }
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    * @param inStream The stream to write to Kafka
    +    * @param topicId ID of the Kafka topic.
    +    * @param serializationSchema User defined (keyless) serialization 
    +    * @param producerConfig Properties with the producer configuration.
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
        String topicId,
        SerializationSchema<IN> serializationSchema,
        Properties producerConfig) {
    +           return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
    +   }
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    *  @param inStream The stream to write to Kafka
    +    *  @param topicId The name of the target topic
    +    *  @param serializationSchema A serializable serialization schema for 
turning user objects into a
    +    *                             kafka-consumable byte[] supporting 
key/value messages
    +    *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only
    +    *                        required argument.
    +    *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
        String topicId,
        KeyedSerializationSchema<IN> serializationSchema,
        Properties producerConfig,
        FlinkKafkaPartitioner<IN> customPartitioner) {
    +           return writeToKafkaWithTimestamps(
    +                   inStream,
    +                   topicId,
    +                   serializationSchema,
    +                   producerConfig,
    +                   customPartitioner,
    +                   Semantic.EXACTLY_ONCE,
    +                   DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +   }
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
    +    *
    +    *  @param inStream The stream to write to Kafka
    +    *  @param topicId The name of the target topic
    +    *  @param serializationSchema A serializable serialization schema for 
turning user objects into a
    +    *                             kafka-consumable byte[] supporting 
key/value messages
    +    *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only
    +    *                        required argument.
    +    *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
    +    *  @param semantic Defines semantic that will be used by this producer 
(see {@link Semantic}).
    +    *  @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
    +    */
    +   public static <IN> FlinkKafkaProducer011Configuration<IN> 
writeToKafkaWithTimestamps(DataStream<IN> inStream,
        String topicId,
        KeyedSerializationSchema<IN> serializationSchema,
        Properties producerConfig,
        FlinkKafkaPartitioner<IN> customPartitioner,
        Semantic semantic,
        int kafkaProducersPoolSize) {
    +           GenericTypeInfo<Object> objectTypeInfo = new 
    +           FlinkKafkaProducer011<IN> kafkaProducer =
    +                   new FlinkKafkaProducer011<>(
    +                           topicId,
    +                           serializationSchema,
    +                           producerConfig,
    +                           customPartitioner,
    +                           semantic,
    +                           kafkaProducersPoolSize);
    +           KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer);
    +           SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink);
    +           return new FlinkKafkaProducer011Configuration<>(transformation, 
    +   }
    +   // ---------------------- Regular constructors w/o timestamp support  
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param brokerList
    +    *                      Comma separated addresses of the brokers
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined (keyless) serialization schema.
    +    */
    +   public FlinkKafkaProducer011(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
    +           this(topicId, new 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +   }
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined (keyless) serialization schema.
    +    * @param producerConfig
    +    *                      Properties with the producer configuration.
    +    */
    +   public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
    +           this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
    +   }
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param topicId The topic to write data to
    +    * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
    +    * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +    * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
    +    */
    +   public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
    +           this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
    +   }
    +   // ------------------- Key/Value serialization schema constructors 
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param brokerList
    +    *                      Comma separated addresses of the brokers
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined serialization schema supporting 
key/value messages
    +    */
    +   public FlinkKafkaProducer011(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
    +           this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +   }
    +   /**
    +    * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
    +    * the topic.
    +    *
    +    * @param topicId
    +    *                      ID of the Kafka topic.
    +    * @param serializationSchema
    +    *                      User defined serialization schema supporting 
key/value messages
    +    * @param producerConfig
    +    *                      Properties with the producer configuration.
    +    */
    +   public FlinkKafkaProducer011(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
    +           this(topicId, serializationSchema, producerConfig, new 
    +   }
    +   /**
    +    * The main constructor for creating a FlinkKafkaProducer.
    +    *
    +    * <p>This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
    +    *
    +    * @param defaultTopicId The default topic to write data to
    +    * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
    +    * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +    * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
    +    */
    +   public FlinkKafkaProducer011(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
    +           this(
    +                   defaultTopicId,
    +                   serializationSchema,
    +                   producerConfig,
    +                   customPartitioner,
    +                   Semantic.EXACTLY_ONCE,
    +                   DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +   }
    +   /**
    +    * The main constructor for creating a FlinkKafkaProducer.
    +    *
    +    * <p>This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
    +    *
    +    * @param defaultTopicId The default topic to write data to
    +    * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
    +    * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +    * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
    +    * @param semantic Defines semantic that will be used by this producer 
(see {@link Semantic}).
    +    * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
    +    */
    +   public FlinkKafkaProducer011(
    +                   String defaultTopicId,
    +                   KeyedSerializationSchema<IN> serializationSchema,
    +                   Properties producerConfig,
    +                   FlinkKafkaPartitioner<IN> customPartitioner,
    +                   Semantic semantic,
    +                   int kafkaProducersPoolSize) {
    +           super(
    +                   TypeInformation.of(KafkaTransactionState.class),
    +                   TypeInformation.of(new 
TypeHint<List<KafkaTransactionState>>() {}));
    +           requireNonNull(defaultTopicId, "TopicID not set");
    +           requireNonNull(serializationSchema, "serializationSchema not 
    +           requireNonNull(producerConfig, "producerConfig not set");
    +           ClosureCleaner.clean(customPartitioner, true);
    +           ClosureCleaner.ensureSerializable(serializationSchema);
    +           this.defaultTopicId = defaultTopicId;
    +           this.schema = serializationSchema;
    +           this.producerConfig = producerConfig;
    +           this.flinkKafkaPartitioner = customPartitioner;
    +           this.semantic = semantic;
    +           this.kafkaProducersPoolSize = kafkaProducersPoolSize;
    --- End diff --
    Check for negative / 0 poll sizes.

> Add Apache Kafka 0.11 connector
> -------------------------------
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 

This message was sent by Atlassian JIRA

Reply via email to