[
https://issues.apache.org/jira/browse/FLINK-24591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Krzysztof Dziolak updated FLINK-24591:
--------------------------------------
Description:
We have identified an issue when using Kafka Producer with Flink 1.13 under
*cluster.intercept-user-system-exit* preset:
{code:java}
java.lang.SecurityException: java.lang.SecurityException: setContextClassLoader
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at
java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
at
java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
at
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.SecurityException: setContextClassLoader
at
java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240)
at
org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
{code}
Impact:
* any applications using Kafka Producer
* when cluster.intercept-user-system-exit setting is used
Tested with connector version 1.13.2
Same data serialized to a file works fine (look the filesystem example in the
reproducer)
was:
We have identified an issue when using Kafka Producer with Flink 1.13 under `
{code:java}
java.lang.SecurityException: java.lang.SecurityException: setContextClassLoader
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at
java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
at
java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
at
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.SecurityException: setContextClassLoader
at
java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240)
at
org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
{code}
Look in the attachments for a reproducer.
Same data serialized to a file works fine (look the filesystem example in the
reproducer)
> Kafka Producer fails with SecurityException when using
> cluster.intercept-user-system-exit
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-24591
> URL: https://issues.apache.org/jira/browse/FLINK-24591
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.2
> Reporter: Krzysztof Dziolak
> Priority: Major
>
> We have identified an issue when using Kafka Producer with Flink 1.13 under
> *cluster.intercept-user-system-exit* preset:
> {code:java}
> java.lang.SecurityException: java.lang.SecurityException:
> setContextClassLoader
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> at
> java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
> at
> java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
> at
> java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.SecurityException: setContextClassLoader
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.setContextClassLoader(ForkJoinWorkerThread.java:240)
> at
> org.apache.flink.util.TemporaryClassLoaderContext.of(TemporaryClassLoaderContext.java:61)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1270)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
> at
> java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> {code}
> Impact:
> * any applications using Kafka Producer
> * when cluster.intercept-user-system-exit setting is used
> Tested with connector version 1.13.2
> Same data serialized to a file works fine (look the filesystem example in the
> reproducer)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)