[FLINK-6988][kafka] Implement our own KafkaProducer class with transactions recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20728ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20728ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20728ba Branch: refs/heads/master Commit: d20728ba46977704827252ee5029bef9f949d5ab Parents: 7a35c35 Author: Piotr Nowojski <[email protected]> Authored: Wed Jul 12 15:14:13 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 9 18:58:36 2017 +0200 ---------------------------------------------------------------------- .../kafka/internal/FlinkKafkaProducer.java | 294 +++++++++++++++++++ .../kafka/FlinkKafkaProducerTests.java | 114 +++++++ 2 files changed, 408 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java new file mode 100644 index 0000000..56b40d7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement + * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer. + * + * <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is + * expected to call: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing + * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between + * {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume + * interrupted transaction and commit if after a restart: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#getProducerId()}</li> + * <li>{@link FlinkKafkaProducer#getEpoch()}</li> + * <li>node failure... restore producerId and epoch from state</li> + * <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()} + * as a way to obtain the producerId and epoch counters. It has to be done, because otherwise + * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions. + * + * <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer} + * is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on + * {@link FlinkKafkaProducer#commitTransaction()}. + * + * <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via + * {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately + * private fields). + * + * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's + * API authors to make them possible. + * + * <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements + * required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to + * re-implement whole Kafka's 0.11 REST API client on our own. + */ +public class FlinkKafkaProducer<K, V> implements Producer<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); + + private final KafkaProducer<K, V> kafkaProducer; + + @Nullable + private final String transactionalId; + + public FlinkKafkaProducer(Properties properties) { + transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + kafkaProducer = new KafkaProducer<>(properties); + } + + // -------------------------------- Simple proxy method calls -------------------------------- + + @Override + public void initTransactions() { + kafkaProducer.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + kafkaProducer.beginTransaction(); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + kafkaProducer.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + kafkaProducer.abortTransaction(); + } + + @Override + public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException { + kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record) { + return kafkaProducer.send(record); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { + return kafkaProducer.send(record, callback); + } + + @Override + public List<PartitionInfo> partitionsFor(String topic) { + return kafkaProducer.partitionsFor(topic); + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return kafkaProducer.metrics(); + } + + @Override + public void close() { + kafkaProducer.close(); + } + + @Override + public void close(long timeout, TimeUnit unit) { + kafkaProducer.close(timeout, unit); + } + + // -------------------------------- New methods or methods with changed behaviour -------------------------------- + + @Override + public void flush() { + kafkaProducer.flush(); + if (transactionalId != null) { + flushNewPartitions(); + } + } + + public void resumeTransaction(long producerId, short epoch) { + Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch); + LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch); + + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers"); + + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); + invoke(sequenceNumbers, "clear"); + + Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); + setValue(producerIdAndEpoch, "producerId", producerId); + setValue(producerIdAndEpoch, "epoch", epoch); + + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); + + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); + setValue(transactionManager, "transactionStarted", true); + } + + public String getTransactionalId() { + return transactionalId; + } + + public long getProducerId() { + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); + return (long) getValue(producerIdAndEpoch, "producerId"); + } + + public short getEpoch() { + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); + return (short) getValue(producerIdAndEpoch, "epoch"); + } + + @VisibleForTesting + public int getTransactionCoordinatorId() { + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + Node node = (Node) invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION); + return node.id(); + } + + private void flushNewPartitions() { + LOG.info("Flushing new partitions"); + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); + invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); + TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); + Object sender = getValue(kafkaProducer, "sender"); + invoke(sender, "wakeup"); + result.await(); + } + + private static Enum<?> getEnum(String enumFullName) { + String[] x = enumFullName.split("\\.(?=[^\\.]+$)"); + if (x.length == 2) { + String enumClassName = x[0]; + String enumName = x[1]; + try { + Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName); + return Enum.valueOf(cl, enumName); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } + return null; + } + + private static Object invoke(Object object, String methodName, Object... args) { + Class<?>[] argTypes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + argTypes[i] = args[i].getClass(); + } + return invoke(object, methodName, argTypes, args); + } + + private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) { + try { + Method method = object.getClass().getDeclaredMethod(methodName, argTypes); + method.setAccessible(true); + return method.invoke(object, args); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } + + private static Object getValue(Object object, String fieldName) { + return getValue(object, object.getClass(), fieldName); + } + + private static Object getValue(Object object, Class<?> clazz, String fieldName) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(object); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } + + private static void setValue(Object object, String fieldName, Object value) { + try { + Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, value); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java new file mode 100644 index 0000000..18bbd8f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for our own {@link FlinkKafkaProducer}. + */ +@SuppressWarnings("serial") +public class FlinkKafkaProducerTests extends KafkaTestBase { + protected String transactionalId; + protected Properties extraProperties; + + @Before + public void before() { + transactionalId = UUID.randomUUID().toString(); + extraProperties = new Properties(); + extraProperties.putAll(standardProps); + extraProperties.put("transactional.id", transactionalId); + extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("isolation.level", "read_committed"); + } + + @Test(timeout = 30000L) + public void testHappyPath() throws IOException { + String topicName = "flink-kafka-producer-happy-path"; + try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.commitTransaction(); + } + assertRecord(topicName, "42", "42"); + deleteTestTopic(topicName); + } + + @Test(timeout = 30000L) + public void testResumeTransaction() throws IOException { + String topicName = "flink-kafka-producer-resume-transaction"; + try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.flush(); + long producerId = kafkaProducer.getProducerId(); + short epoch = kafkaProducer.getEpoch(); + + try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + + assertRecord(topicName, "42", "42"); + + // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction + kafkaProducer.commitTransaction(); + + // this shouldn't fail also, for same reason as above + try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + } + deleteTestTopic(topicName); + } + + private void assertRecord(String topicName, String expectedKey, String expectedValue) { + try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { + kafkaConsumer.subscribe(Collections.singletonList(topicName)); + ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); + + ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); + assertEquals(expectedKey, record.key()); + assertEquals(expectedValue, record.value()); + } + } +}
