[ https://issues.apache.org/jira/browse/RANGER-5222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17956832#comment-17956832 ]
Abhishek Kumar commented on RANGER-5222: ---------------------------------------- Could you please try with the latest ranger 2.6 release? thanks. > The Kafka plugin throws a NoClassDefFoundError due to a missing log4j > dependency. > --------------------------------------------------------------------------------- > > Key: RANGER-5222 > URL: https://issues.apache.org/jira/browse/RANGER-5222 > Project: Ranger > Issue Type: Bug > Components: audit > Affects Versions: 2.1.0 > Reporter: Wenyue Li > Priority: Major > Original Estimate: 96h > Remaining Estimate: 96h > > When I set {{xasecure.audit.destination.elasticsearch}} to {{true}} in the > Kafka plugin configuration, if only one Elasticsearch node is online or > Elasticsearch is not functioning properly, an exception is thrown during the > initialization of the {{RestHighLevelClient}} class, resulting in a {{null}} > return value,The code is as follows: > {code:java} > package org.apache.ranger.audit.destination; > public class ElasticSearchAuditDestination extends AuditDestination { > private RestHighLevelClient newClient() { > try { > if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && > password.contains("keytab") && new File(password).exists()) { > subject = CredentialsProviderUtil.login(user, password); > } > RestClientBuilder restClientBuilder = > getRestClientBuilder(hosts, protocol, user, password, port); > RestHighLevelClient restHighLevelClient = new > RestHighLevelClient(restClientBuilder); > if (LOG.isDebugEnabled()) { > LOG.debug("Initialized client"); > } > boolean exits = false; > try { > exits = restHighLevelClient.indices().open(new OpenIndexRequest(this.index), > RequestOptions.DEFAULT).isShardsAcknowledged(); > } catch (Exception e) { > LOG.warn("Error validating index " + this.index); > } > if(exits) { > if (LOG.isDebugEnabled()) { > LOG.debug("Index exists"); > } > } else { > LOG.info("Index does not exist"); > } > return restHighLevelClient; > } catch (Throwable t) { > lastLoggedAt.updateAndGet(lastLoggedAt -> { > long now = System.currentTimeMillis(); > long elapsed = now - lastLoggedAt; > if (elapsed > TimeUnit.MINUTES.toMillis(1)) { > LOG.fatal("Can't connect to ElasticSearch server: " + connectionString(), t); > return now; > } else { > return lastLoggedAt; > } > }); > return null; > } > } > }{code} > The root cause of the error is that when the Elasticsearch cluster is running > with a single node, the parsed entity is not null, which leads to an > exception during the parsing of the BytesRestResponse class because > LogManager.getLogger cannot find the log4j dependency. As a result, the > RestHighLevelClient is initialized each time an audit log is written, > creating a large number of threads and causing thread leaks, eventually > preventing the Kafka cluster from functioning properly,The code is as follows: > {code:java} > package org.elasticsearch.client; > protected final ElasticsearchStatusException > parseResponseException(ResponseException responseException) { > Response response = responseException.getResponse(); > HttpEntity entity = response.getEntity(); > ElasticsearchStatusException elasticsearchException; > RestStatus restStatus = > RestStatus.fromCode(response.getStatusLine().getStatusCode()); > if (entity == null) { > elasticsearchException = new ElasticsearchStatusException( > responseException.getMessage(), restStatus, > responseException); > } else { > try { > elasticsearchException = parseEntity(entity, > BytesRestResponse::errorFromXContent); > elasticsearchException.addSuppressed(responseException); > } catch (Exception e) { > elasticsearchException = new ElasticsearchStatusException("Unable > to parse response body", restStatus, responseException); > elasticsearchException.addSuppressed(e); > } > } > return elasticsearchException; > } {code} > {code:java} > package org.elasticsearch.rest; > private static final Logger SUPPRESSED_ERROR_LOGGER = > LogManager.getLogger("rest.suppressed"); {code} > Full error attached: > {code:java} > FATAL Can't connect to ElasticSearch server: User:elastic, > http://xxx,xxx,xxx:9201/ranger_audits > (org.apache.ranger.audit.destination.ElasticSearchAuditDestination) > java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager > at > org.elasticsearch.rest.BytesRestResponse.<clinit>(BytesRestResponse.java:120) > at > org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) > at > org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) > at > org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) > at > org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) > at > org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) > at org.elasticsearch.client.IndicesClient.open(IndicesClient.java:467) > at > org.apache.ranger.audit.destination.ElasticSearchAuditDestination.newClient(ElasticSearchAuditDestination.java:253) > at > org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getClient(ElasticSearchAuditDestination.java:184) > at > org.apache.ranger.audit.destination.ElasticSearchAuditDestination.init(ElasticSearchAuditDestination.java:98) > at > org.apache.ranger.audit.provider.AuditProviderFactory.init(AuditProviderFactory.java:181) > at > org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:175) > at > org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:118) > at > org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:94) > at > kafka.security.authorizer.AuthorizerWrapper.configure(AuthorizerWrapper.scala:40) > at kafka.server.KafkaServer.$anonfun$startup$4(KafkaServer.scala:297) > at kafka.server.KafkaServer.$anonfun$startup$4$adapted(KafkaServer.scala:297) > at scala.Option.foreach(Option.scala:437) > at kafka.server.KafkaServer.startup(KafkaServer.scala:297) > at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:84) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.lang.ClassNotFoundException: > org.apache.logging.log4j.LogManager > at java.lang.ClassLoader.findClass(ClassLoader.java:530) > at > org.apache.ranger.plugin.classloader.RangerPluginClassLoader$MyClassLoader.findClass(RangerPluginClassLoader.java:290) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > org.apache.ranger.plugin.classloader.RangerPluginClassLoader.loadClass(RangerPluginClassLoader.java:132) > ... 22 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)