[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136429#comment-16136429 ]
ASF GitHub Bot commented on FLINK-6988: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134399156 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement + * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer. + * + * <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is + * expected to call: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing + * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between + * {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume + * interrupted transaction and commit if after a restart: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#getProducerId()}</li> + * <li>{@link FlinkKafkaProducer#getEpoch()}</li> + * <li>node failure... restore producerId and epoch from state</li> + * <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()} + * as a way to obtain the producerId and epoch counters. It has to be done, because otherwise + * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions. + * + * <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer} + * is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on + * {@link FlinkKafkaProducer#commitTransaction()}. + * + * <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via + * {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately + * private fields). + * + * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's + * API authors to make them possible. + * + * <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements + * required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to + * re-implement whole Kafka's 0.11 REST API client on our own. + */ +public class FlinkKafkaProducer<K, V> implements Producer<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); + + private final KafkaProducer<K, V> kafkaProducer; + @Nullable + private final String transactionalId; + + public FlinkKafkaProducer(Properties properties) { + transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + kafkaProducer = new KafkaProducer<>(properties); + } + + // -------------------------------- Simple proxy method calls -------------------------------- + + @Override + public void initTransactions() { + kafkaProducer.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + kafkaProducer.beginTransaction(); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + kafkaProducer.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + kafkaProducer.abortTransaction(); + } + + @Override + public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException { + kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record) { + return kafkaProducer.send(record); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { + return kafkaProducer.send(record, callback); + } + + @Override + public List<PartitionInfo> partitionsFor(String topic) { + return kafkaProducer.partitionsFor(topic); + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return kafkaProducer.metrics(); + } + + @Override + public void close() { + kafkaProducer.close(); + } + + @Override + public void close(long timeout, TimeUnit unit) { + kafkaProducer.close(timeout, unit); + } + + // -------------------------------- New methods or methods with changed behaviour -------------------------------- + + @Override + public void flush() { + kafkaProducer.flush(); + if (transactionalId != null) { + flushNewPartitions(); + } + } + + public void resumeTransaction(long producerId, short epoch) { + if (!(producerId >= 0 && epoch >= 0)) { + throw new IllegalStateException(String.format("Incorrect values for producerId [%s] and epoch [%s]", producerId, epoch)); --- End diff -- Can use `Preconditions.checkState(...)` here. > Add Apache Kafka 0.11 connector > ------------------------------- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)