This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0d0c9  [FLINK-16262][connectors/kafka] Set the context classloader 
for parallel stream in FlinkKafkaProducer
ff0d0c9 is described below

commit ff0d0c979d7cf67648ecf91850e782e99d557240
Author: guowei.mgw <[email protected]>
AuthorDate: Sat Mar 28 21:22:57 2020 +0800

    [FLINK-16262][connectors/kafka] Set the context classloader for parallel 
stream in FlinkKafkaProducer
    
    This closes #11247
---
 .../connectors/kafka/FlinkKafkaProducer.java       | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3a92c81..9b42b5f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -1095,18 +1096,24 @@ public class FlinkKafkaProducer<IN>
        // ----------------------------------- Utilities 
--------------------------
 
        private void abortTransactions(final Set<String> transactionalIds) {
+               final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
                transactionalIds.parallelStream().forEach(transactionalId -> {
-                       // don't mess with the original configuration or any 
other properties of the
-                       // original object
-                       // -> create an internal kafka producer on our own and 
do not rely on
-                       //    initTransactionalProducer().
-                       final Properties myConfig = new Properties();
-                       myConfig.putAll(producerConfig);
-                       initTransactionalProducerConfig(myConfig, 
transactionalId);
-                       try (FlinkKafkaInternalProducer<byte[], byte[]> 
kafkaProducer =
-                                       new 
FlinkKafkaInternalProducer<>(myConfig)) {
-                               // it suffices to call initTransactions - this 
will abort any lingering transactions
-                               kafkaProducer.initTransactions();
+                       // The parallelStream executes the consumer in a 
separated thread pool.
+                       // Because the consumer(e.g. Kafka) uses the context 
classloader to construct some class
+                       // we should set the correct classloader for it.
+                       try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {
+                               // don't mess with the original configuration 
or any other properties of the
+                               // original object
+                               // -> create an internal kafka producer on our 
own and do not rely on
+                               //    initTransactionalProducer().
+                               final Properties myConfig = new Properties();
+                               myConfig.putAll(producerConfig);
+                               initTransactionalProducerConfig(myConfig, 
transactionalId);
+                               try (FlinkKafkaInternalProducer<byte[], byte[]> 
kafkaProducer =
+                                               new 
FlinkKafkaInternalProducer<>(myConfig)) {
+                                       // it suffices to call initTransactions 
- this will abort any lingering transactions
+                                       kafkaProducer.initTransactions();
+                               }
                        }
                });
        }

Reply via email to