This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new e39cfe7  [FLINK-16262][connectors/kafka] Set the context classloader 
for parallel stream in FlinkKafkaProducer (#11497)
e39cfe7 is described below

commit e39cfe7660daaeed4213f04ccbce6de1e8d90fe5
Author: guowei.mgw <[email protected]>
AuthorDate: Sat Mar 28 21:26:31 2020 +0800

    [FLINK-16262][connectors/kafka] Set the context classloader for parallel 
stream in FlinkKafkaProducer (#11497)
    
    This closes #11497
---
 .../connectors/kafka/FlinkKafkaProducer.java       | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 821a8f1..40d8644 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -1096,18 +1097,24 @@ public class FlinkKafkaProducer<IN>
        // ----------------------------------- Utilities 
--------------------------
 
        private void abortTransactions(final Set<String> transactionalIds) {
+               final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
                transactionalIds.parallelStream().forEach(transactionalId -> {
-                       // don't mess with the original configuration or any 
other properties of the
-                       // original object
-                       // -> create an internal kafka producer on our own and 
do not rely on
-                       //    initTransactionalProducer().
-                       final Properties myConfig = new Properties();
-                       myConfig.putAll(producerConfig);
-                       initTransactionalProducerConfig(myConfig, 
transactionalId);
-                       try (FlinkKafkaInternalProducer<byte[], byte[]> 
kafkaProducer =
-                                       new 
FlinkKafkaInternalProducer<>(myConfig)) {
-                               // it suffices to call initTransactions - this 
will abort any lingering transactions
-                               kafkaProducer.initTransactions();
+                       // The parallelStream executes the consumer in a 
separated thread pool.
+                       // Because the consumer(e.g. Kafka) uses the context 
classloader to construct some class
+                       // we should set the correct classloader for it.
+                       try (TemporaryClassLoaderContext ignored = new 
TemporaryClassLoaderContext(classLoader)) {
+                               // don't mess with the original configuration 
or any other properties of the
+                               // original object
+                               // -> create an internal kafka producer on our 
own and do not rely on
+                               //    initTransactionalProducer().
+                               final Properties myConfig = new Properties();
+                               myConfig.putAll(producerConfig);
+                               initTransactionalProducerConfig(myConfig, 
transactionalId);
+                               try (FlinkKafkaInternalProducer<byte[], byte[]> 
kafkaProducer =
+                                               new 
FlinkKafkaInternalProducer<>(myConfig)) {
+                                       // it suffices to call initTransactions 
- this will abort any lingering transactions
+                                       kafkaProducer.initTransactions();
+                               }
                        }
                });
        }

Reply via email to