Wenyue Li created RANGER-5222:
---------------------------------

             Summary: The Kafka plugin throws a NoClassDefFoundError due to a 
missing log4j dependency.
                 Key: RANGER-5222
                 URL: https://issues.apache.org/jira/browse/RANGER-5222
             Project: Ranger
          Issue Type: Bug
          Components: audit
    Affects Versions: 2.1.0
            Reporter: Wenyue Li


When I set {{xasecure.audit.destination.elasticsearch}} to {{true}} in the 
Kafka plugin configuration, if only one Elasticsearch node is online or 
Elasticsearch is not functioning properly, an exception is thrown during the 
initialization of the {{RestHighLevelClient}} class, resulting in a {{null}} 
return value,The code is as follows:
{code:java}
package org.apache.ranger.audit.destination;
public class ElasticSearchAuditDestination extends AuditDestination {
private RestHighLevelClient newClient() {
try {
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && 
password.contains("keytab") && new File(password).exists()) {
subject = CredentialsProviderUtil.login(user, password);
}
RestClientBuilder restClientBuilder =
getRestClientBuilder(hosts, protocol, user, password, port);
RestHighLevelClient restHighLevelClient = new 
RestHighLevelClient(restClientBuilder);
if (LOG.isDebugEnabled()) {
LOG.debug("Initialized client");
}
boolean exits = false;
try {
exits = restHighLevelClient.indices().open(new OpenIndexRequest(this.index), 
RequestOptions.DEFAULT).isShardsAcknowledged();
} catch (Exception e) {
LOG.warn("Error validating index " + this.index);
}
if(exits) {
if (LOG.isDebugEnabled()) {
LOG.debug("Index exists");
}
} else {
LOG.info("Index does not exist");
}
return restHighLevelClient;
} catch (Throwable t) {
lastLoggedAt.updateAndGet(lastLoggedAt -> {
long now = System.currentTimeMillis();
long elapsed = now - lastLoggedAt;
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
LOG.fatal("Can't connect to ElasticSearch server: " + connectionString(), t);
return now;
} else {
return lastLoggedAt;
}
});
return null;
}
} 
}{code}
The root cause of the error is that when the Elasticsearch cluster is running 
with a single node, the parsed entity is not null, which leads to an exception 
during the parsing of the BytesRestResponse class because LogManager.getLogger 
cannot find the log4j dependency. As a result, the RestHighLevelClient is 
initialized each time an audit log is written, creating a large number of 
threads and causing thread leaks, eventually preventing the Kafka cluster from 
functioning properly,The code is as follows:
{code:java}
package org.elasticsearch.client;

protected final ElasticsearchStatusException 
parseResponseException(ResponseException responseException) {
    Response response = responseException.getResponse();
    HttpEntity entity = response.getEntity();
    ElasticsearchStatusException elasticsearchException;
    RestStatus restStatus = 
RestStatus.fromCode(response.getStatusLine().getStatusCode());

    if (entity == null) {
        elasticsearchException = new ElasticsearchStatusException(
                responseException.getMessage(), restStatus, responseException);
    } else {
        try {
            elasticsearchException = parseEntity(entity, 
BytesRestResponse::errorFromXContent);
            elasticsearchException.addSuppressed(responseException);
        } catch (Exception e) {
            elasticsearchException = new ElasticsearchStatusException("Unable 
to parse response body", restStatus, responseException);
            elasticsearchException.addSuppressed(e);
        }
    }
    return elasticsearchException;
} {code}
{code:java}
package org.elasticsearch.rest;

private static final Logger SUPPRESSED_ERROR_LOGGER = 
LogManager.getLogger("rest.suppressed"); {code}
Full error attached:
{code:java}
FATAL Can't connect to ElasticSearch server: User:elastic, 
http://xxx,xxx,xxx:9201/ranger_audits 
(org.apache.ranger.audit.destination.ElasticSearchAuditDestination)
java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
 at 
org.elasticsearch.rest.BytesRestResponse.<clinit>(BytesRestResponse.java:120)
 at 
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
 at 
org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770)
 at 
org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527)
 at 
org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
 at 
org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
 at org.elasticsearch.client.IndicesClient.open(IndicesClient.java:467)
 at 
org.apache.ranger.audit.destination.ElasticSearchAuditDestination.newClient(ElasticSearchAuditDestination.java:253)
 at 
org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getClient(ElasticSearchAuditDestination.java:184)
 at 
org.apache.ranger.audit.destination.ElasticSearchAuditDestination.init(ElasticSearchAuditDestination.java:98)
 at 
org.apache.ranger.audit.provider.AuditProviderFactory.init(AuditProviderFactory.java:181)
 at 
org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:175)
 at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:118)
 at 
org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:94)
 at 
kafka.security.authorizer.AuthorizerWrapper.configure(AuthorizerWrapper.scala:40)
 at kafka.server.KafkaServer.$anonfun$startup$4(KafkaServer.scala:297)
 at kafka.server.KafkaServer.$anonfun$startup$4$adapted(KafkaServer.scala:297)
 at scala.Option.foreach(Option.scala:437)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:297)
 at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
 at kafka.Kafka$.main(Kafka.scala:84)
 at kafka.Kafka.main(Kafka.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.logging.log4j.LogManager
 at java.lang.ClassLoader.findClass(ClassLoader.java:530)
 at 
org.apache.ranger.plugin.classloader.RangerPluginClassLoader$MyClassLoader.findClass(RangerPluginClassLoader.java:290)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at 
org.apache.ranger.plugin.classloader.RangerPluginClassLoader.loadClass(RangerPluginClassLoader.java:132)
 ... 22 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to