fapaul commented on a change in pull request #16676: URL: https://github.com/apache/flink/pull/16676#discussion_r683293143
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/FlinkKafkaInternalProducer.java ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.sink; + +import org.apache.flink.util.Preconditions; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Properties; + +/** + * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state. + */ +class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class); + private static final String TRANSACTION_MANAGER_STATE_ENUM = + "org.apache.kafka.clients.producer.internals.TransactionManager$State"; + + private final Properties kafkaProducerConfig; + @Nullable private final String transactionalId; + + public FlinkKafkaInternalProducer(Properties properties) { + super(properties); + this.kafkaProducerConfig = properties; + this.transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + } + + @Override + public void flush() { + super.flush(); + if (transactionalId != null) { + flushNewPartitions(); + } + } + + public Properties getKafkaProducerConfig() { + return kafkaProducerConfig; + } + + public short getEpoch() { + Object transactionManager = getField("transactionManager"); + Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch"); + return (short) getField(producerIdAndEpoch, "epoch"); + } + + public long getProducerId() { + Object transactionManager = getField("transactionManager"); + Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch"); + return (long) getField(producerIdAndEpoch, "producerId"); + } + + /** + * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} + * is also adding new partitions to the transaction. flushNewPartitions method is moving this + * logic to pre-commit/flush, to make resumeTransaction simpler. Otherwise resumeTransaction + * would require to restore state of the not yet added/"in-flight" partitions. + */ + private void flushNewPartitions() { + LOG.info("Flushing new partitions"); + TransactionalRequestResult result = enqueueNewPartitions(); + Object sender = getField("sender"); + invoke(sender, "wakeup"); + result.await(); + } + + /** + * Enqueues new transactions at the transaction manager and returns a {@link + * TransactionalRequestResult} that allows waiting on them. + * + * <p>If there are no new transactions we return a {@link TransactionalRequestResult} that is + * already done. + */ + private TransactionalRequestResult enqueueNewPartitions() { + Object transactionManager = getField("transactionManager"); + synchronized (transactionManager) { + Object newPartitionsInTransaction = + getField(transactionManager, "newPartitionsInTransaction"); + Object newPartitionsInTransactionIsEmpty = + invoke(newPartitionsInTransaction, "isEmpty"); + TransactionalRequestResult result; + if (newPartitionsInTransactionIsEmpty instanceof Boolean + && !((Boolean) newPartitionsInTransactionIsEmpty)) { + Object txnRequestHandler = + invoke(transactionManager, "addPartitionsToTransactionHandler"); + invoke( + transactionManager, + "enqueueRequest", + new Class[] {txnRequestHandler.getClass().getSuperclass()}, + new Object[] {txnRequestHandler}); + result = + (TransactionalRequestResult) + getField( + txnRequestHandler, + txnRequestHandler.getClass().getSuperclass(), + "result"); + } else { + // we don't have an operation but this operation string is also used in + // addPartitionsToTransactionHandler. + result = new TransactionalRequestResult("AddPartitionsToTxn"); + result.done(); + } + return result; + } + } + + private static Object invoke(Object object, String methodName, Object... args) { + Class<?>[] argTypes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + argTypes[i] = args[i].getClass(); + } + return invoke(object, methodName, argTypes, args); + } + + private static Object invoke( + Object object, String methodName, Class<?>[] argTypes, Object[] args) { + try { + Method method = object.getClass().getDeclaredMethod(methodName, argTypes); + method.setAccessible(true); + return method.invoke(object, args); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Incompatible KafkaProducer version", e); + } + } Review comment: Can you give an example of what exactly you want to see here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
