[ 
https://issues.apache.org/jira/browse/RANGER-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Prostran updated RANGER-4940:
----------------------------------
    Description: 
A NullPointerException occurs in RangerKafkaAuthorizer during the execution of 
the callRangerPlugin method, specifically when calling 
auditHandler.flushAudit() because auditHandler is null. The issue arises from 
the constructor of RangerKafkaAuthorizer being called twice, while the 
configure method is called only once. Since auditHandler is initialized in the 
configure method, the second constructor call without initialization via the 
configure() method causes auditHandler to remain uninitialized, leading to the 
following exception.
 
{code:java}
[2024-09-25 12:40:30,785] DEBUG <== RangerPluginClassLoader.deactivate() 
(org.apache.ranger.plugin.classloader.RangerPluginClassLoader)
[2024-09-25 12:40:30,785] ERROR [KafkaApi-1] Unexpected error handling request 
RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=console-consumer, 
correlationId=0, headerVersio
n=2) – FindCoordinatorRequestData(key='', keyType=0, 
coordinatorKeys=[console-consumer-34426]) with context 
RequestContext(header=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clie
ntId=console-consumer, correlationId=0, headerVersion=2), 
connectionId='10.0.0.59:9094-10.0.0.59:56938-0', clientAddress=/10.0.0.59, 
principal=User:kafka, listenerName=ListenerName(KERBERO
S), securityProtocol=SASL_PLAINTEXT, 
clientInformation=ClientInformation(softwareName=apache-kafka-java, 
softwareVersion=7.7.0-ccs), fromPrivilegedListener=false, 
principalSerde=Optional[o
rg.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@104e7feb])
 (kafka.server.KafkaApis)
java.lang.NullPointerException
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.callRangerPlugin(RangerKafkaAuthorizer.java:303)
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.wrappedAuthorization(RangerKafkaAuthorizer.java:288)
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:246)
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:136)
        at kafka.server.AuthHelper.$anonfun$authorize$1(AuthHelper.scala:53)
        at 
kafka.server.AuthHelper.$anonfun$authorize$1$adapted(AuthHelper.scala:50)
        at scala.Option.forall(Option.scala:420)
        at kafka.server.AuthHelper.authorize(AuthHelper.scala:50)
        at kafka.server.KafkaApis.getCoordinator(KafkaApis.scala:1647)
        at 
kafka.server.KafkaApis.$anonfun$handleFindCoordinatorRequestV4AndAbove$1(KafkaApis.scala:1601)
        at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
        at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
        at 
scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:138)
        at 
kafka.server.KafkaApis.handleFindCoordinatorRequestV4AndAbove(KafkaApis.scala:1600)
        at 
kafka.server.KafkaApis.handleFindCoordinatorRequest(KafkaApis.scala:1593)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:194)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:160)
        at java.base/java.lang.Thread.run(Thread.java:829)
{code}
 
The problematic method
{code:java}
  auditHandler.flushAudit(); RangerKafkaAuthorizer.java:303
{code}
{code:java}
 private Collection<RangerAccessResult> 
callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
    try {
      return rangerPlugin.isAccessAllowed(rangerRequests);
    } catch (Throwable t) {
      logger.error("Error while calling isAccessAllowed(). requests={}", 
rangerRequests, t);
      return null;
    } finally {
      auditHandler.flushAudit();
    }
  }
{code}
Console consumer also reports the following error:
{code:java}
[2024-09-25 12:40:30,784] WARN [Consumer clientId=console-consumer, 
groupId=console-consumer-34426] Error while fetching metadata with correlation 
id 2 : \{ranger_audits=UNKNOWN_SERVER_ERROR} 
(org.apache.kafka.clients.NetworkClient)
[2024-09-25 12:40:30,791] ERROR Error processing message, terminating consumer 
process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
[2024-09-25 12:40:30,870] WARN [Principal=kafka/governance-test.lan.croz.net]: 
TGT renewal thread has been interrupted and will exit. 
(org.apache.kafka.common.security.kerberos.KerberosLogin)
Processed a total of 0 messages
{code}
Quick Workaround for Testing Purposes:

To ensure single initialization across multiple constructor calls, make 
auditHandler static and volatile:
{code:java}
private static volatile RangerKafkaAuditHandler auditHandler = null;{code}
Original source:
{code:java}
private static volatile RangerBasePlugin rangerPlugin = null;
RangerKafkaAuditHandler auditHandler = null;
{code}

  was:
A NullPointerException occurs in RangerKafkaAuthorizer during the execution of 
the callRangerPlugin method, specifically when calling 
auditHandler.flushAudit() because auditHandler is null. The issue arises from 
the constructor of RangerKafkaAuthorizer being called twice, while the 
configure method is called only once. Since auditHandler is initialized in the 
configure method, the second constructor call without initialization via the 
configure() method causes auditHandler to remain uninitialized, leading to the 
following exception.
 
{code:java}
[2024-09-25 12:40:30,785] DEBUG <== RangerPluginClassLoader.deactivate() 
(org.apache.ranger.plugin.classloader.RangerPluginClassLoader)
[2024-09-25 12:40:30,785] ERROR [KafkaApi-1] Unexpected error handling request 
RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=console-consumer, 
correlationId=0, headerVersio
n=2) – FindCoordinatorRequestData(key='', keyType=0, 
coordinatorKeys=[console-consumer-34426]) with context 
RequestContext(header=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clie
ntId=console-consumer, correlationId=0, headerVersion=2), 
connectionId='10.0.0.59:9094-10.0.0.59:56938-0', clientAddress=/10.0.0.59, 
principal=User:kafka, listenerName=ListenerName(KERBERO
S), securityProtocol=SASL_PLAINTEXT, 
clientInformation=ClientInformation(softwareName=apache-kafka-java, 
softwareVersion=7.7.0-ccs), fromPrivilegedListener=false, 
principalSerde=Optional[o
rg.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@104e7feb])
 (kafka.server.KafkaApis)
java.lang.NullPointerException
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.callRangerPlugin(RangerKafkaAuthorizer.java:303)
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.wrappedAuthorization(RangerKafkaAuthorizer.java:288)
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:246)
        at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:136)
        at kafka.server.AuthHelper.$anonfun$authorize$1(AuthHelper.scala:53)
        at 
kafka.server.AuthHelper.$anonfun$authorize$1$adapted(AuthHelper.scala:50)
        at scala.Option.forall(Option.scala:420)
        at kafka.server.AuthHelper.authorize(AuthHelper.scala:50)
        at kafka.server.KafkaApis.getCoordinator(KafkaApis.scala:1647)
        at 
kafka.server.KafkaApis.$anonfun$handleFindCoordinatorRequestV4AndAbove$1(KafkaApis.scala:1601)
        at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
        at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
        at 
scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:138)
        at 
kafka.server.KafkaApis.handleFindCoordinatorRequestV4AndAbove(KafkaApis.scala:1600)
        at 
kafka.server.KafkaApis.handleFindCoordinatorRequest(KafkaApis.scala:1593)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:194)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:160)
        at java.base/java.lang.Thread.run(Thread.java:829)
{code}
 
The problematic method
{code:java}
  auditHandler.flushAudit(); RangerKafkaAuthorizer.java:303
{code}
{code:java}
 private Collection<RangerAccessResult> 
callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
    try {
      return rangerPlugin.isAccessAllowed(rangerRequests);
    } catch (Throwable t) {
      logger.error("Error while calling isAccessAllowed(). requests={}", 
rangerRequests, t);
      return null;
    } finally {
      auditHandler.flushAudit();
    }
  }
{code}
Console consumer also reports the following error:
{code:java}
[2024-09-25 12:40:30,784] WARN [Consumer clientId=console-consumer, 
groupId=console-consumer-34426] Error while fetching metadata with correlation 
id 2 : \{ranger_audits=UNKNOWN_SERVER_ERROR} 
(org.apache.kafka.clients.NetworkClient)
[2024-09-25 12:40:30,791] ERROR Error processing message, terminating consumer 
process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
[2024-09-25 12:40:30,870] WARN 
[Principal=kafka/pbz-governance-test.lan.croz.net]: TGT renewal thread has been 
interrupted and will exit. 
(org.apache.kafka.common.security.kerberos.KerberosLogin)
Processed a total of 0 messages
{code}



Quick Workaround for Testing Purposes:

To ensure single initialization across multiple constructor calls, make 
auditHandler static and volatile:

{code:java}
private static volatile RangerKafkaAuditHandler auditHandler = null;{code}

Original source:

{code:java}
private static volatile RangerBasePlugin rangerPlugin = null;
RangerKafkaAuditHandler auditHandler = null;
{code}




> Null Pointer Exception in RangerKafkaAuthorizer - Null auditHandler in 
> callRangerPlugin Method
> ----------------------------------------------------------------------------------------------
>
>                 Key: RANGER-4940
>                 URL: https://issues.apache.org/jira/browse/RANGER-4940
>             Project: Ranger
>          Issue Type: Bug
>          Components: audit, plugins
>    Affects Versions: 3.0.0, 2.5.0
>         Environment: - Rocky Linux release 9.4 (Blue Onyx)
> - 5.14.0-427.31.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Aug 14 16:15:25 UTC 
> 2024 x86_64 x86_64 x86_64 GNU/Linux
> - java-11-openjdk-11.0.24.0.8-2.el9.x86_64
> - ranger-3.0.0-SNAPSHOT-admin.tar.gz
> - ranger-3.0.0-SNAPSHOT-kafka-plugin.tar.gz
>            Reporter: Ivan Prostran
>            Priority: Major
>              Labels: RangerKafkaAuthorizer, authorization-plugin
>
> A NullPointerException occurs in RangerKafkaAuthorizer during the execution 
> of the callRangerPlugin method, specifically when calling 
> auditHandler.flushAudit() because auditHandler is null. The issue arises from 
> the constructor of RangerKafkaAuthorizer being called twice, while the 
> configure method is called only once. Since auditHandler is initialized in 
> the configure method, the second constructor call without initialization via 
> the configure() method causes auditHandler to remain uninitialized, leading 
> to the following exception.
>  
> {code:java}
> [2024-09-25 12:40:30,785] DEBUG <== RangerPluginClassLoader.deactivate() 
> (org.apache.ranger.plugin.classloader.RangerPluginClassLoader)
> [2024-09-25 12:40:30,785] ERROR [KafkaApi-1] Unexpected error handling 
> request RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, 
> clientId=console-consumer, correlationId=0, headerVersio
> n=2) – FindCoordinatorRequestData(key='', keyType=0, 
> coordinatorKeys=[console-consumer-34426]) with context 
> RequestContext(header=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, 
> clie
> ntId=console-consumer, correlationId=0, headerVersion=2), 
> connectionId='10.0.0.59:9094-10.0.0.59:56938-0', clientAddress=/10.0.0.59, 
> principal=User:kafka, listenerName=ListenerName(KERBERO
> S), securityProtocol=SASL_PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=7.7.0-ccs), fromPrivilegedListener=false, 
> principalSerde=Optional[o
> rg.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@104e7feb])
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>         at 
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.callRangerPlugin(RangerKafkaAuthorizer.java:303)
>         at 
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.wrappedAuthorization(RangerKafkaAuthorizer.java:288)
>         at 
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:246)
>         at 
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.authorize(RangerKafkaAuthorizer.java:136)
>         at kafka.server.AuthHelper.$anonfun$authorize$1(AuthHelper.scala:53)
>         at 
> kafka.server.AuthHelper.$anonfun$authorize$1$adapted(AuthHelper.scala:50)
>         at scala.Option.forall(Option.scala:420)
>         at kafka.server.AuthHelper.authorize(AuthHelper.scala:50)
>         at kafka.server.KafkaApis.getCoordinator(KafkaApis.scala:1647)
>         at 
> kafka.server.KafkaApis.$anonfun$handleFindCoordinatorRequestV4AndAbove$1(KafkaApis.scala:1601)
>         at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>         at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>         at 
> scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:138)
>         at 
> kafka.server.KafkaApis.handleFindCoordinatorRequestV4AndAbove(KafkaApis.scala:1600)
>         at 
> kafka.server.KafkaApis.handleFindCoordinatorRequest(KafkaApis.scala:1593)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:194)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:160)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> {code}
>  
> The problematic method
> {code:java}
>   auditHandler.flushAudit(); RangerKafkaAuthorizer.java:303
> {code}
> {code:java}
>  private Collection<RangerAccessResult> 
> callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
>     try {
>       return rangerPlugin.isAccessAllowed(rangerRequests);
>     } catch (Throwable t) {
>       logger.error("Error while calling isAccessAllowed(). requests={}", 
> rangerRequests, t);
>       return null;
>     } finally {
>       auditHandler.flushAudit();
>     }
>   }
> {code}
> Console consumer also reports the following error:
> {code:java}
> [2024-09-25 12:40:30,784] WARN [Consumer clientId=console-consumer, 
> groupId=console-consumer-34426] Error while fetching metadata with 
> correlation id 2 : \{ranger_audits=UNKNOWN_SERVER_ERROR} 
> (org.apache.kafka.clients.NetworkClient)
> [2024-09-25 12:40:30,791] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.
> [2024-09-25 12:40:30,870] WARN 
> [Principal=kafka/governance-test.lan.croz.net]: TGT renewal thread has been 
> interrupted and will exit. 
> (org.apache.kafka.common.security.kerberos.KerberosLogin)
> Processed a total of 0 messages
> {code}
> Quick Workaround for Testing Purposes:
> To ensure single initialization across multiple constructor calls, make 
> auditHandler static and volatile:
> {code:java}
> private static volatile RangerKafkaAuditHandler auditHandler = null;{code}
> Original source:
> {code:java}
> private static volatile RangerBasePlugin rangerPlugin = null;
> RangerKafkaAuditHandler auditHandler = null;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to