Keith Wall created KAFKA-17134:
----------------------------------

             Summary: Restarting a server (same JVM) configured for OAUTHBEARER 
fails with RejectedExecutionException
                 Key: KAFKA-17134
                 URL: https://issues.apache.org/jira/browse/KAFKA-17134
             Project: Kafka
          Issue Type: Bug
            Reporter: Keith Wall


If you programmatically restart a server (3.7.1) configured for OAUTHBEARER 
{*}within the same JVM{*}, the startup attempt fails with the stack trace given 
below.

The issue is that a closed {{VerificationKeyResolver}} gets left behind in the
{{{}OAuthBearerValidatorCallbackHandler.{}}}{{VERIFICATION_KEY_RESOLVER_CACHE}} 
after the server is shutdown.  On restart, as the server's config is unchanged, 
the closed {{VerificationKeyResolver}} gets reused.  The 
{{ScheduledThreadPoolExecutor}} is already in a closed state so the init call 
fails.
 
A reproducer for this problem is found here: 
[https://github.com/k-wall/oauth_bearer_leak/blob/main/src/main/java/OAuthBearerValidatorLeak.java#L51]

The reproducer can be used with this OAuth Server:

{{docker run --rm -p 8080:8080 ghcr.io/navikt/mock-oauth2-server:2.1.8}}

 

{{Exception in thread "main" org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.KafkaException: The OAuth validator configuration 
encountered an error when initializing the VerificationKeyResolver}}
{{    at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)}}
{{    at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)}}
{{    at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)}}
{{    at kafka.network.Processor.<init>(SocketServer.scala:973)}}
{{    at kafka.network.Acceptor.newProcessor(SocketServer.scala:879)}}
{{    at 
kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849)}}
{{    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)}}
{{    at kafka.network.Acceptor.addProcessors(SocketServer.scala:848)}}
{{    at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523)}}
{{    at 
kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251)}}
{{    at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)}}
{{    at 
kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)}}
{{    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)}}
{{    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)}}
{{    at scala.collection.AbstractIterable.foreach(Iterable.scala:933)}}
{{    at kafka.network.SocketServer.<init>(SocketServer.scala:175)}}
{{    at kafka.server.BrokerServer.startup(BrokerServer.scala:255)}}
{{    at 
kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)}}
{{    at 
kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)}}
{{    at scala.Option.foreach(Option.scala:437)}}
{{    at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)}}
{{    at OAuthBearerValidatorLeak.main(OAuthBearerValidatorLeak.java:51)}}
{{Caused by: org.apache.kafka.common.KafkaException: The OAuth validator 
configuration encountered an error when initializing the 
VerificationKeyResolver}}
{{    at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.init(OAuthBearerValidatorCallbackHandler.java:146)}}
{{    at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.configure(OAuthBearerValidatorCallbackHandler.java:136)}}
{{    at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:151)}}
{{    ... 21 more}}
{{Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4f66ffc8[Not
 completed, task = 
java.util.concurrent.Executors$RunnableAdapter@1bc49bc5[Wrapped task = 
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwks$$Lambda/0x00000001373c7c88@7b6e5c12]]
 rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@39e67516[Terminated, pool size 
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)}}
{{    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)}}
{{    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:632)}}
{{    at 
java.base/java.util.concurrent.Executors$DelegatedScheduledExecutorService.scheduleAtFixedRate(Executors.java:870)}}
{{    at 
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwks.init(RefreshingHttpsJwks.java:198)}}
{{    at 
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwksVerificationKeyResolver.init(RefreshingHttpsJwksVerificationKeyResolver.java:103)}}
{{    at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler$RefCountingVerificationKeyResolver.init(OAuthBearerValidatorCallbackHandler.java:266)}}
{{    at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.init(OAuthBearerValidatorCallbackHandler.java:144)}}
{{    ... 23 more}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to