[ 
https://issues.apache.org/jira/browse/RANGER-5222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17956832#comment-17956832
 ] 

Abhishek Kumar commented on RANGER-5222:
----------------------------------------

Could you please try with the latest ranger 2.6 release? thanks.

> The Kafka plugin throws a NoClassDefFoundError due to a missing log4j 
> dependency.
> ---------------------------------------------------------------------------------
>
>                 Key: RANGER-5222
>                 URL: https://issues.apache.org/jira/browse/RANGER-5222
>             Project: Ranger
>          Issue Type: Bug
>          Components: audit
>    Affects Versions: 2.1.0
>            Reporter: Wenyue Li
>            Priority: Major
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> When I set {{xasecure.audit.destination.elasticsearch}} to {{true}} in the 
> Kafka plugin configuration, if only one Elasticsearch node is online or 
> Elasticsearch is not functioning properly, an exception is thrown during the 
> initialization of the {{RestHighLevelClient}} class, resulting in a {{null}} 
> return value,The code is as follows:
> {code:java}
> package org.apache.ranger.audit.destination;
> public class ElasticSearchAuditDestination extends AuditDestination {
> private RestHighLevelClient newClient() {
> try {
> if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && 
> password.contains("keytab") && new File(password).exists()) {
> subject = CredentialsProviderUtil.login(user, password);
> }
> RestClientBuilder restClientBuilder =
> getRestClientBuilder(hosts, protocol, user, password, port);
> RestHighLevelClient restHighLevelClient = new 
> RestHighLevelClient(restClientBuilder);
> if (LOG.isDebugEnabled()) {
> LOG.debug("Initialized client");
> }
> boolean exits = false;
> try {
> exits = restHighLevelClient.indices().open(new OpenIndexRequest(this.index), 
> RequestOptions.DEFAULT).isShardsAcknowledged();
> } catch (Exception e) {
> LOG.warn("Error validating index " + this.index);
> }
> if(exits) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Index exists");
> }
> } else {
> LOG.info("Index does not exist");
> }
> return restHighLevelClient;
> } catch (Throwable t) {
> lastLoggedAt.updateAndGet(lastLoggedAt -> {
> long now = System.currentTimeMillis();
> long elapsed = now - lastLoggedAt;
> if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
> LOG.fatal("Can't connect to ElasticSearch server: " + connectionString(), t);
> return now;
> } else {
> return lastLoggedAt;
> }
> });
> return null;
> }
> } 
> }{code}
> The root cause of the error is that when the Elasticsearch cluster is running 
> with a single node, the parsed entity is not null, which leads to an 
> exception during the parsing of the BytesRestResponse class because 
> LogManager.getLogger cannot find the log4j dependency. As a result, the 
> RestHighLevelClient is initialized each time an audit log is written, 
> creating a large number of threads and causing thread leaks, eventually 
> preventing the Kafka cluster from functioning properly,The code is as follows:
> {code:java}
> package org.elasticsearch.client;
> protected final ElasticsearchStatusException 
> parseResponseException(ResponseException responseException) {
>     Response response = responseException.getResponse();
>     HttpEntity entity = response.getEntity();
>     ElasticsearchStatusException elasticsearchException;
>     RestStatus restStatus = 
> RestStatus.fromCode(response.getStatusLine().getStatusCode());
>     if (entity == null) {
>         elasticsearchException = new ElasticsearchStatusException(
>                 responseException.getMessage(), restStatus, 
> responseException);
>     } else {
>         try {
>             elasticsearchException = parseEntity(entity, 
> BytesRestResponse::errorFromXContent);
>             elasticsearchException.addSuppressed(responseException);
>         } catch (Exception e) {
>             elasticsearchException = new ElasticsearchStatusException("Unable 
> to parse response body", restStatus, responseException);
>             elasticsearchException.addSuppressed(e);
>         }
>     }
>     return elasticsearchException;
> } {code}
> {code:java}
> package org.elasticsearch.rest;
> private static final Logger SUPPRESSED_ERROR_LOGGER = 
> LogManager.getLogger("rest.suppressed"); {code}
> Full error attached:
> {code:java}
> FATAL Can't connect to ElasticSearch server: User:elastic, 
> http://xxx,xxx,xxx:9201/ranger_audits 
> (org.apache.ranger.audit.destination.ElasticSearchAuditDestination)
> java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
>  at 
> org.elasticsearch.rest.BytesRestResponse.<clinit>(BytesRestResponse.java:120)
>  at 
> org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
>  at 
> org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770)
>  at 
> org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527)
>  at 
> org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
>  at 
> org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
>  at org.elasticsearch.client.IndicesClient.open(IndicesClient.java:467)
>  at 
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.newClient(ElasticSearchAuditDestination.java:253)
>  at 
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getClient(ElasticSearchAuditDestination.java:184)
>  at 
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.init(ElasticSearchAuditDestination.java:98)
>  at 
> org.apache.ranger.audit.provider.AuditProviderFactory.init(AuditProviderFactory.java:181)
>  at 
> org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:175)
>  at 
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:118)
>  at 
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:94)
>  at 
> kafka.security.authorizer.AuthorizerWrapper.configure(AuthorizerWrapper.scala:40)
>  at kafka.server.KafkaServer.$anonfun$startup$4(KafkaServer.scala:297)
>  at kafka.server.KafkaServer.$anonfun$startup$4$adapted(KafkaServer.scala:297)
>  at scala.Option.foreach(Option.scala:437)
>  at kafka.server.KafkaServer.startup(KafkaServer.scala:297)
>  at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>  at kafka.Kafka$.main(Kafka.scala:84)
>  at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.logging.log4j.LogManager
>  at java.lang.ClassLoader.findClass(ClassLoader.java:530)
>  at 
> org.apache.ranger.plugin.classloader.RangerPluginClassLoader$MyClassLoader.findClass(RangerPluginClassLoader.java:290)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  at 
> org.apache.ranger.plugin.classloader.RangerPluginClassLoader.loadClass(RangerPluginClassLoader.java:132)
>  ... 22 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to