[
https://issues.apache.org/jira/browse/FLINK-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192568#comment-17192568
]
Daebeom Lee commented on FLINK-14012:
-------------------------------------
I have used 1.11.0 and 1.10.x. Maybe this issue is resolved at the versions.
I'll close this issue.
Thank you for your comment.
> Failed to start job for consuming Secure Kafka after the job cancel
> -------------------------------------------------------------------
>
> Key: FLINK-14012
> URL: https://issues.apache.org/jira/browse/FLINK-14012
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.9.0
> Environment: * Kubernetes 1.13.2
> * Flink 1.9.0
> * Kafka client libary 2.2.0
> Reporter: Daebeom Lee
> Priority: Minor
>
> Hello, this is Daebeom Lee.
> h2. Background
> I installed Flink 1.9.0 at this our Kubernetes cluster.
> We use Flink session cluster. - build fatJar file and upload it at the UI,
> run serval jobs.
> At first, our jobs are good to start.
> But, when we cancel some jobs, the job failed
> This is the error code.
> {code:java}
> // code placeholder
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/security/scram/internals/ScramSaslClient
> at
> org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
> at
> org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
> at
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
> at
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
> at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
> at
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> h2. Our workaround
> * I think that this is Flink JVM classloader issue.
> * Classloader unloads when job cancels by the way kafka client library is
> included fatJar.
> * So, I located Kafka client library to /opt/flink/lib
> ** /opt/flink/lib/kafka-clients-2.2.0.jar
> * And then all issue solved.
> * But there are weird points
> ** When Flink 1.8.1 has no problem before 2 weeks
> ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred.
> ** Maybe docker image is changed at docker repository (
> [https://github.com/docker-flink/docker-flink )
> |https://github.com/docker-flink/docker-flink]
>
> h2. Suggestion
> * I'd like to know why this error occurred exactly reason after upgrade
> 1.9.0.
> * Does anybody know a better solution in this case?
>
> Thank you in advance.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)