[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136429#comment-16136429
 ] 

ASF GitHub Bot commented on FLINK-6988:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4239#discussion_r134399156
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.Node;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.common.errors.ProducerFencedException;
    +import org.apache.kafka.common.requests.FindCoordinatorRequest;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.lang.reflect.Field;
    +import java.lang.reflect.InvocationTargetException;
    +import java.lang.reflect.Method;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Wrapper around KafkaProducer that allows to resume transactions in case 
of node failure, which allows to implement
    + * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
    + *
    + * <p>For happy path usage is exactly the same as {@link 
org.apache.kafka.clients.producer.KafkaProducer}. User is
    + * expected to call:
    + *
    + * <ul>
    + *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
    + *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
    + *     <li>{@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
    + *     <li>{@link FlinkKafkaProducer#flush()}</li>
    + *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
    + * </ul>
    + *
    + * <p>To actually implement two phase commit, it must be possible to 
always commit a transaction after pre-committing
    + * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In 
case of some failure between
    + * {@link FlinkKafkaProducer#flush()} and {@link 
FlinkKafkaProducer#commitTransaction()} this class allows to resume
    + * interrupted transaction and commit if after a restart:
    + *
    + * <ul>
    + *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
    + *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
    + *     <li>{@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
    + *     <li>{@link FlinkKafkaProducer#flush()}</li>
    + *     <li>{@link FlinkKafkaProducer#getProducerId()}</li>
    + *     <li>{@link FlinkKafkaProducer#getEpoch()}</li>
    + *     <li>node failure... restore producerId and epoch from state</li>
    + *     <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
    + *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
    + * </ul>
    + *
    + * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces 
{@link FlinkKafkaProducer#initTransactions()}
    + * as a way to obtain the producerId and epoch counters. It has to be 
done, because otherwise
    + * {@link FlinkKafkaProducer#initTransactions()} would automatically abort 
all on going transactions.
    + *
    + * <p>Second way this implementation differs from the reference {@link 
org.apache.kafka.clients.producer.KafkaProducer}
    + * is that this one actually flushes new partitions on {@link 
FlinkKafkaProducer#flush()} instead of on
    + * {@link FlinkKafkaProducer#commitTransaction()}.
    + *
    + * <p>The last one minor difference is that it allows to obtain the 
producerId and epoch counters via
    + * {@link FlinkKafkaProducer#getProducerId()} and {@link 
FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
    + * private fields).
    + *
    + * <p>Those changes are compatible with Kafka's 0.11.0 REST API although 
it clearly was not the intention of the Kafka's
    + * API authors to make them possible.
    + *
    + * <p>Internally this implementation uses {@link 
org.apache.kafka.clients.producer.KafkaProducer} and implements
    + * required changes via Java Reflection API. It might not be the prettiest 
solution. An alternative would be to
    + * re-implement whole Kafka's 0.11 REST API client on our own.
    + */
    +public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducer.class);
    +
    +   private final KafkaProducer<K, V> kafkaProducer;
    +   @Nullable
    +   private final String transactionalId;
    +
    +   public FlinkKafkaProducer(Properties properties) {
    +           transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
    +           kafkaProducer = new KafkaProducer<>(properties);
    +   }
    +
    +   // -------------------------------- Simple proxy method calls 
--------------------------------
    +
    +   @Override
    +   public void initTransactions() {
    +           kafkaProducer.initTransactions();
    +   }
    +
    +   @Override
    +   public void beginTransaction() throws ProducerFencedException {
    +           kafkaProducer.beginTransaction();
    +   }
    +
    +   @Override
    +   public void commitTransaction() throws ProducerFencedException {
    +           kafkaProducer.commitTransaction();
    +   }
    +
    +   @Override
    +   public void abortTransaction() throws ProducerFencedException {
    +           kafkaProducer.abortTransaction();
    +   }
    +
    +   @Override
    +   public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets, String consumerGroupId) throws 
ProducerFencedException {
    +           kafkaProducer.sendOffsetsToTransaction(offsets, 
consumerGroupId);
    +   }
    +
    +   @Override
    +   public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    +           return kafkaProducer.send(record);
    +   }
    +
    +   @Override
    +   public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
    +           return kafkaProducer.send(record, callback);
    +   }
    +
    +   @Override
    +   public List<PartitionInfo> partitionsFor(String topic) {
    +           return kafkaProducer.partitionsFor(topic);
    +   }
    +
    +   @Override
    +   public Map<MetricName, ? extends Metric> metrics() {
    +           return kafkaProducer.metrics();
    +   }
    +
    +   @Override
    +   public void close() {
    +           kafkaProducer.close();
    +   }
    +
    +   @Override
    +   public void close(long timeout, TimeUnit unit) {
    +           kafkaProducer.close(timeout, unit);
    +   }
    +
    +   // -------------------------------- New methods or methods with changed 
behaviour --------------------------------
    +
    +   @Override
    +   public void flush() {
    +           kafkaProducer.flush();
    +           if (transactionalId != null) {
    +                   flushNewPartitions();
    +           }
    +   }
    +
    +   public void resumeTransaction(long producerId, short epoch) {
    +           if (!(producerId >= 0 && epoch >= 0)) {
    +                   throw new 
IllegalStateException(String.format("Incorrect values for producerId [%s] and 
epoch [%s]", producerId, epoch));
    --- End diff --
    
    Can use `Preconditions.checkState(...)` here.


> Add Apache Kafka 0.11 connector
> -------------------------------
>
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to