fapaul commented on a change in pull request #16676:
URL: https://github.com/apache/flink/pull/16676#discussion_r683293143



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+/**
+ * A {@link KafkaProducer} that exposes private fields to allow resume 
producing from a given state.
+ */
+class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
+    private static final String TRANSACTION_MANAGER_STATE_ENUM =
+            
"org.apache.kafka.clients.producer.internals.TransactionManager$State";
+
+    private final Properties kafkaProducerConfig;
+    @Nullable private final String transactionalId;
+
+    public FlinkKafkaInternalProducer(Properties properties) {
+        super(properties);
+        this.kafkaProducerConfig = properties;
+        this.transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+    }
+
+    @Override
+    public void flush() {
+        super.flush();
+        if (transactionalId != null) {
+            flushNewPartitions();
+        }
+    }
+
+    public Properties getKafkaProducerConfig() {
+        return kafkaProducerConfig;
+    }
+
+    public short getEpoch() {
+        Object transactionManager = getField("transactionManager");
+        Object producerIdAndEpoch = getField(transactionManager, 
"producerIdAndEpoch");
+        return (short) getField(producerIdAndEpoch, "epoch");
+    }
+
+    public long getProducerId() {
+        Object transactionManager = getField("transactionManager");
+        Object producerIdAndEpoch = getField(transactionManager, 
"producerIdAndEpoch");
+        return (long) getField(producerIdAndEpoch, "producerId");
+    }
+
+    /**
+     * Besides committing {@link 
org.apache.kafka.clients.producer.KafkaProducer#commitTransaction}
+     * is also adding new partitions to the transaction. flushNewPartitions 
method is moving this
+     * logic to pre-commit/flush, to make resumeTransaction simpler. Otherwise 
resumeTransaction
+     * would require to restore state of the not yet added/"in-flight" 
partitions.
+     */
+    private void flushNewPartitions() {
+        LOG.info("Flushing new partitions");
+        TransactionalRequestResult result = enqueueNewPartitions();
+        Object sender = getField("sender");
+        invoke(sender, "wakeup");
+        result.await();
+    }
+
+    /**
+     * Enqueues new transactions at the transaction manager and returns a 
{@link
+     * TransactionalRequestResult} that allows waiting on them.
+     *
+     * <p>If there are no new transactions we return a {@link 
TransactionalRequestResult} that is
+     * already done.
+     */
+    private TransactionalRequestResult enqueueNewPartitions() {
+        Object transactionManager = getField("transactionManager");
+        synchronized (transactionManager) {
+            Object newPartitionsInTransaction =
+                    getField(transactionManager, "newPartitionsInTransaction");
+            Object newPartitionsInTransactionIsEmpty =
+                    invoke(newPartitionsInTransaction, "isEmpty");
+            TransactionalRequestResult result;
+            if (newPartitionsInTransactionIsEmpty instanceof Boolean
+                    && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+                Object txnRequestHandler =
+                        invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+                invoke(
+                        transactionManager,
+                        "enqueueRequest",
+                        new Class[] 
{txnRequestHandler.getClass().getSuperclass()},
+                        new Object[] {txnRequestHandler});
+                result =
+                        (TransactionalRequestResult)
+                                getField(
+                                        txnRequestHandler,
+                                        
txnRequestHandler.getClass().getSuperclass(),
+                                        "result");
+            } else {
+                // we don't have an operation but this operation string is 
also used in
+                // addPartitionsToTransactionHandler.
+                result = new TransactionalRequestResult("AddPartitionsToTxn");
+                result.done();
+            }
+            return result;
+        }
+    }
+
+    private static Object invoke(Object object, String methodName, Object... 
args) {
+        Class<?>[] argTypes = new Class[args.length];
+        for (int i = 0; i < args.length; i++) {
+            argTypes[i] = args[i].getClass();
+        }
+        return invoke(object, methodName, argTypes, args);
+    }
+
+    private static Object invoke(
+            Object object, String methodName, Class<?>[] argTypes, Object[] 
args) {
+        try {
+            Method method = object.getClass().getDeclaredMethod(methodName, 
argTypes);
+            method.setAccessible(true);
+            return method.invoke(object, args);
+        } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+            throw new RuntimeException("Incompatible KafkaProducer version", 
e);
+        }
+    }

Review comment:
       Can you give an example of what exactly you want to see here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to