[FLINK-6988][kafka] Implement our own KafkaProducer class with transactions 
recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20728ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20728ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20728ba

Branch: refs/heads/master
Commit: d20728ba46977704827252ee5029bef9f949d5ab
Parents: 7a35c35
Author: Piotr Nowojski <[email protected]>
Authored: Wed Jul 12 15:14:13 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/internal/FlinkKafkaProducer.java      | 294 +++++++++++++++++++
 .../kafka/FlinkKafkaProducerTests.java          | 114 +++++++
 2 files changed, 408 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
new file mode 100644
index 0000000..56b40d7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case of 
node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * <p>For happy path usage is exactly the same as {@link 
org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * <ul>
+ *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ *     <li>{@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ *     <li>{@link FlinkKafkaProducer#flush()}</li>
+ *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>To actually implement two phase commit, it must be possible to always 
commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case 
of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link 
FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * <ul>
+ *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ *     <li>{@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ *     <li>{@link FlinkKafkaProducer#flush()}</li>
+ *     <li>{@link FlinkKafkaProducer#getProducerId()}</li>
+ *     <li>{@link FlinkKafkaProducer#getEpoch()}</li>
+ *     <li>node failure... restore producerId and epoch from state</li>
+ *     <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
+ *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces 
{@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be done, 
because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all 
on going transactions.
+ *
+ * <p>Second way this implementation differs from the reference {@link 
org.apache.kafka.clients.producer.KafkaProducer}
+ * is that this one actually flushes new partitions on {@link 
FlinkKafkaProducer#flush()} instead of on
+ * {@link FlinkKafkaProducer#commitTransaction()}.
+ *
+ * <p>The last one minor difference is that it allows to obtain the producerId 
and epoch counters via
+ * {@link FlinkKafkaProducer#getProducerId()} and {@link 
FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
+ * private fields).
+ *
+ * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it 
clearly was not the intention of the Kafka's
+ * API authors to make them possible.
+ *
+ * <p>Internally this implementation uses {@link 
org.apache.kafka.clients.producer.KafkaProducer} and implements
+ * required changes via Java Reflection API. It might not be the prettiest 
solution. An alternative would be to
+ * re-implement whole Kafka's 0.11 REST API client on our own.
+ */
+public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+       private final KafkaProducer<K, V> kafkaProducer;
+
+       @Nullable
+       private final String transactionalId;
+
+       public FlinkKafkaProducer(Properties properties) {
+               transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+               kafkaProducer = new KafkaProducer<>(properties);
+       }
+
+       // -------------------------------- Simple proxy method calls 
--------------------------------
+
+       @Override
+       public void initTransactions() {
+               kafkaProducer.initTransactions();
+       }
+
+       @Override
+       public void beginTransaction() throws ProducerFencedException {
+               kafkaProducer.beginTransaction();
+       }
+
+       @Override
+       public void commitTransaction() throws ProducerFencedException {
+               kafkaProducer.commitTransaction();
+       }
+
+       @Override
+       public void abortTransaction() throws ProducerFencedException {
+               kafkaProducer.abortTransaction();
+       }
+
+       @Override
+       public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets, String consumerGroupId) throws 
ProducerFencedException {
+               kafkaProducer.sendOffsetsToTransaction(offsets, 
consumerGroupId);
+       }
+
+       @Override
+       public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+               return kafkaProducer.send(record);
+       }
+
+       @Override
+       public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
+               return kafkaProducer.send(record, callback);
+       }
+
+       @Override
+       public List<PartitionInfo> partitionsFor(String topic) {
+               return kafkaProducer.partitionsFor(topic);
+       }
+
+       @Override
+       public Map<MetricName, ? extends Metric> metrics() {
+               return kafkaProducer.metrics();
+       }
+
+       @Override
+       public void close() {
+               kafkaProducer.close();
+       }
+
+       @Override
+       public void close(long timeout, TimeUnit unit) {
+               kafkaProducer.close(timeout, unit);
+       }
+
+       // -------------------------------- New methods or methods with changed 
behaviour --------------------------------
+
+       @Override
+       public void flush() {
+               kafkaProducer.flush();
+               if (transactionalId != null) {
+                       flushNewPartitions();
+               }
+       }
+
+       public void resumeTransaction(long producerId, short epoch) {
+               Preconditions.checkState(producerId >= 0 && epoch >= 0, 
"Incorrect values for producerId {} and epoch {}", producerId, epoch);
+               LOG.info("Attempting to resume transaction with producerId {} 
and epoch {}", producerId, epoch);
+
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               Object sequenceNumbers = getValue(transactionManager, 
"sequenceNumbers");
+
+               invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+               invoke(sequenceNumbers, "clear");
+
+               Object producerIdAndEpoch = getValue(transactionManager, 
"producerIdAndEpoch");
+               setValue(producerIdAndEpoch, "producerId", producerId);
+               setValue(producerIdAndEpoch, "epoch", epoch);
+
+               invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+               invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+               setValue(transactionManager, "transactionStarted", true);
+       }
+
+       public String getTransactionalId() {
+               return transactionalId;
+       }
+
+       public long getProducerId() {
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               Object producerIdAndEpoch = getValue(transactionManager, 
"producerIdAndEpoch");
+               return (long) getValue(producerIdAndEpoch, "producerId");
+       }
+
+       public short getEpoch() {
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               Object producerIdAndEpoch = getValue(transactionManager, 
"producerIdAndEpoch");
+               return (short) getValue(producerIdAndEpoch, "epoch");
+       }
+
+       @VisibleForTesting
+       public int getTransactionCoordinatorId() {
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               Node node = (Node) invoke(transactionManager, "coordinator", 
FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+               return node.id();
+       }
+
+       private void flushNewPartitions() {
+               LOG.info("Flushing new partitions");
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+               invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
+               TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+               Object sender = getValue(kafkaProducer, "sender");
+               invoke(sender, "wakeup");
+               result.await();
+       }
+
+       private static Enum<?> getEnum(String enumFullName) {
+               String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
+               if (x.length == 2) {
+                       String enumClassName = x[0];
+                       String enumName = x[1];
+                       try {
+                               Class<Enum> cl = (Class<Enum>) 
Class.forName(enumClassName);
+                               return Enum.valueOf(cl, enumName);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Incompatible 
KafkaProducer version", e);
+                       }
+               }
+               return null;
+       }
+
+       private static Object invoke(Object object, String methodName, 
Object... args) {
+               Class<?>[] argTypes = new Class[args.length];
+               for (int i = 0; i < args.length; i++) {
+                       argTypes[i] = args[i].getClass();
+               }
+               return invoke(object, methodName, argTypes, args);
+       }
+
+       private static Object invoke(Object object, String methodName, 
Class<?>[] argTypes, Object[] args) {
+               try {
+                       Method method = 
object.getClass().getDeclaredMethod(methodName, argTypes);
+                       method.setAccessible(true);
+                       return method.invoke(object, args);
+               } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+                       throw new RuntimeException("Incompatible KafkaProducer 
version", e);
+               }
+       }
+
+       private static Object getValue(Object object, String fieldName) {
+               return getValue(object, object.getClass(), fieldName);
+       }
+
+       private static Object getValue(Object object, Class<?> clazz, String 
fieldName) {
+               try {
+                       Field field = clazz.getDeclaredField(fieldName);
+                       field.setAccessible(true);
+                       return field.get(object);
+               } catch (NoSuchFieldException | IllegalAccessException e) {
+                       throw new RuntimeException("Incompatible KafkaProducer 
version", e);
+               }
+       }
+
+       private static void setValue(Object object, String fieldName, Object 
value) {
+               try {
+                       Field field = 
object.getClass().getDeclaredField(fieldName);
+                       field.setAccessible(true);
+                       field.set(object, value);
+               } catch (NoSuchFieldException | IllegalAccessException e) {
+                       throw new RuntimeException("Incompatible KafkaProducer 
version", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
new file mode 100644
index 0000000..18bbd8f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for our own {@link FlinkKafkaProducer}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducerTests extends KafkaTestBase {
+       protected String transactionalId;
+       protected Properties extraProperties;
+
+       @Before
+       public void before() {
+               transactionalId = UUID.randomUUID().toString();
+               extraProperties = new Properties();
+               extraProperties.putAll(standardProps);
+               extraProperties.put("transactional.id", transactionalId);
+               extraProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("isolation.level", "read_committed");
+       }
+
+       @Test(timeout = 30000L)
+       public void testHappyPath() throws IOException {
+               String topicName = "flink-kafka-producer-happy-path";
+               try (Producer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
+                       kafkaProducer.commitTransaction();
+               }
+               assertRecord(topicName, "42", "42");
+               deleteTestTopic(topicName);
+       }
+
+       @Test(timeout = 30000L)
+       public void testResumeTransaction() throws IOException {
+               String topicName = "flink-kafka-producer-resume-transaction";
+               try (FlinkKafkaProducer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
+                       kafkaProducer.flush();
+                       long producerId = kafkaProducer.getProducerId();
+                       short epoch = kafkaProducer.getEpoch();
+
+                       try (FlinkKafkaProducer<String, String> resumeProducer 
= new FlinkKafkaProducer<>(extraProperties)) {
+                               resumeProducer.resumeTransaction(producerId, 
epoch);
+                               resumeProducer.commitTransaction();
+                       }
+
+                       assertRecord(topicName, "42", "42");
+
+                       // this shouldn't throw - in case of network split, old 
producer might attempt to commit it's transaction
+                       kafkaProducer.commitTransaction();
+
+                       // this shouldn't fail also, for same reason as above
+                       try (FlinkKafkaProducer<String, String> resumeProducer 
= new FlinkKafkaProducer<>(extraProperties)) {
+                               resumeProducer.resumeTransaction(producerId, 
epoch);
+                               resumeProducer.commitTransaction();
+                       }
+               }
+               deleteTestTopic(topicName);
+       }
+
+       private void assertRecord(String topicName, String expectedKey, String 
expectedValue) {
+               try (KafkaConsumer<String, String> kafkaConsumer = new 
KafkaConsumer<>(extraProperties)) {
+                       
kafkaConsumer.subscribe(Collections.singletonList(topicName));
+                       ConsumerRecords<String, String> records = 
kafkaConsumer.poll(10000);
+
+                       ConsumerRecord<String, String> record = 
Iterables.getOnlyElement(records);
+                       assertEquals(expectedKey, record.key());
+                       assertEquals(expectedValue, record.value());
+               }
+       }
+}

Reply via email to