[
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136427#comment-16136427
]
ASF GitHub Bot commented on FLINK-6988:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4239#discussion_r134399048
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case
of node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * <p>For happy path usage is exactly the same as {@link
org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * <ul>
+ * <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ * <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ * <li>{@link
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ * <li>{@link FlinkKafkaProducer#flush()}</li>
+ * <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>To actually implement two phase commit, it must be possible to
always commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In
case of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link
FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * <ul>
+ * <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ * <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ * <li>{@link
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ * <li>{@link FlinkKafkaProducer#flush()}</li>
+ * <li>{@link FlinkKafkaProducer#getProducerId()}</li>
+ * <li>{@link FlinkKafkaProducer#getEpoch()}</li>
+ * <li>node failure... restore producerId and epoch from state</li>
+ * <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
+ * <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces
{@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be
done, because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort
all on going transactions.
+ *
+ * <p>Second way this implementation differs from the reference {@link
org.apache.kafka.clients.producer.KafkaProducer}
+ * is that this one actually flushes new partitions on {@link
FlinkKafkaProducer#flush()} instead of on
+ * {@link FlinkKafkaProducer#commitTransaction()}.
+ *
+ * <p>The last one minor difference is that it allows to obtain the
producerId and epoch counters via
+ * {@link FlinkKafkaProducer#getProducerId()} and {@link
FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
+ * private fields).
+ *
+ * <p>Those changes are compatible with Kafka's 0.11.0 REST API although
it clearly was not the intention of the Kafka's
+ * API authors to make them possible.
+ *
+ * <p>Internally this implementation uses {@link
org.apache.kafka.clients.producer.KafkaProducer} and implements
+ * required changes via Java Reflection API. It might not be the prettiest
solution. An alternative would be to
+ * re-implement whole Kafka's 0.11 REST API client on our own.
+ */
+public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+ private final KafkaProducer<K, V> kafkaProducer;
+ @Nullable
--- End diff --
nit: empty line before this field annotation.
> Add Apache Kafka 0.11 connector
> -------------------------------
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions.
> Thanks to that, Flink might be able to implement Kafka sink supporting
> "exactly-once" semantic. API changes and whole transactions support is
> described in
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New
> FlinkKafkaProducer011 would
> * upon creation begin transaction, store transaction identifiers into the
> state and would write all incoming data to an output Kafka topic using that
> transaction
> * on `snapshotState` call, it would flush the data and write in state
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we
> either abort this pending transaction (if not every participant successfully
> saved the snapshot) or restore and commit it.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)