[
https://issues.apache.org/jira/browse/FLINK-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17199889#comment-17199889
]
Aljoscha Krettek commented on FLINK-14012:
------------------------------------------
I recall there were different classloader changes in Flink 1.10.x. I don't know
right now which exactly fixed it. Could you maybe try Flink 1.10.x to see if
your problem is fixed. Then we could try and find which commit actually fixed
it.
> Failed to start job for consuming Secure Kafka after the job cancel
> -------------------------------------------------------------------
>
> Key: FLINK-14012
> URL: https://issues.apache.org/jira/browse/FLINK-14012
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.9.0
> Environment: * Kubernetes 1.13.2
> * Flink 1.9.0
> * Kafka client libary 2.2.0
> Reporter: Daebeom Lee
> Priority: Minor
>
> Hello, this is Daebeom Lee.
> h2. Background
> I installed Flink 1.9.0 at this our Kubernetes cluster.
> We use Flink session cluster. - build fatJar file and upload it at the UI,
> run serval jobs.
> At first, our jobs are good to start.
> But, when we cancel some jobs, the job failed
> This is the error code.
> {code:java}
> // code placeholder
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/security/scram/internals/ScramSaslClient
> at
> org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
> at
> org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
> at
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
> at
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
> at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
> at
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> h2. Our workaround
> * I think that this is Flink JVM classloader issue.
> * Classloader unloads when job cancels by the way kafka client library is
> included fatJar.
> * So, I located Kafka client library to /opt/flink/lib
> ** /opt/flink/lib/kafka-clients-2.2.0.jar
> * And then all issue solved.
> * But there are weird points
> ** When Flink 1.8.1 has no problem before 2 weeks
> ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred.
> ** Maybe docker image is changed at docker repository (
> [https://github.com/docker-flink/docker-flink )
> |https://github.com/docker-flink/docker-flink]
>
> h2. Suggestion
> * I'd like to know why this error occurred exactly reason after upgrade
> 1.9.0.
> * Does anybody know a better solution in this case?
>
> Thank you in advance.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)