zentol commented on a change in pull request #11247: [FLINK-16262][Connectors]
Set the context classloader for parallel stream in FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/11247#discussion_r385356328
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -1095,18 +1096,21 @@ private void
resetAvailableTransactionalIdsPool(Collection<String> transactional
// ----------------------------------- Utilities
--------------------------
private void abortTransactions(final Set<String> transactionalIds) {
+ final ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
transactionalIds.parallelStream().forEach(transactionalId -> {
- // don't mess with the original configuration or any
other properties of the
- // original object
- // -> create an internal kafka producer on our own and
do not rely on
- // initTransactionalProducer().
- final Properties myConfig = new Properties();
- myConfig.putAll(producerConfig);
- initTransactionalProducerConfig(myConfig,
transactionalId);
- try (FlinkKafkaInternalProducer<byte[], byte[]>
kafkaProducer =
- new
FlinkKafkaInternalProducer<>(myConfig)) {
- // it suffices to call initTransactions - this
will abort any lingering transactions
- kafkaProducer.initTransactions();
+ try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(classLoader)) {
Review comment:
Please add a comment that `parallelStream` executes the consumer in a
separate thread pool.
At a glance this context can easily appear as a no-op if you miss that
`parallelStream` is used and what the semantics of it are.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services