boscodurai commented on a change in pull request #42: DINF-1390 Added start of implementation for ElasticSearch write/searc… URL: https://github.com/apache/ranger/pull/42#discussion_r342401206
########## File path: agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHost; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.*; + + +public class ElasticSearchAuditDestination extends AuditDestination { + private static final Log LOG = LogFactory.getLog(ElasticSearchAuditDestination.class); + + public static final String ELASTICSEARCH_URLS = "urls"; + public static final String ELASTICSEARCH_PORT = "port"; + public static final String ELASTICSEARCH_PROTOCOL = "protocol"; + + public ElasticSearchAuditDestination() { + } + + private volatile RestHighLevelClient client = null; + + @Override + public void init(Properties props, String propPrefix) { + LOG.info("init() called"); + super.init(props, propPrefix); + connect(); + } + + @Override + public void stop() { + super.stop(); + logStatus(); + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + boolean ret = false; + try { + logStatusIfRequired(); + addTotalCount(events.size()); + + if (client == null) { + connect(); + if (client == null) { + // Solr is still not initialized. So need return error + addDeferredCount(events.size()); + return ret; + } + } + + final Collection<Map<String, Object>> docs = new ArrayList<Map<String, Object>>(); + for (AuditEventBase event : events) { + AuthzAuditEvent authzEvent = (AuthzAuditEvent) event; + docs.add(toDoc(authzEvent)); + } + try { + IndexRequest indexRequest = new IndexRequest("posts").id("1").source(docs.toArray()); + IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT); + + if (response.status().getStatus() >= 400) { + addFailedCount(events.size()); + logFailedEvent(events, response.toString()); + } else { + addSuccessCount(events.size()); + ret = true; + } + } catch (Exception ex) { + addFailedCount(events.size()); + logFailedEvent(events, ex); + } + } catch (Throwable t) { + addDeferredCount(events.size()); + logError("Error sending message to Solr", t); + } + return ret; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + + } + + public boolean isAsync() { + return true; + } + + synchronized void connect() { + RestHighLevelClient me = client; + if (me == null) { + synchronized (ElasticSearchAuditDestination.class) { + me = client; + if (client == null) { + String urls = MiscUtil.getStringProperty(props, propPrefix + "." + ELASTICSEARCH_URLS); + String protocol = getStringProperty(props, propPrefix + "." + ELASTICSEARCH_PROTOCOL, "http"); + int port = MiscUtil.getIntProperty(props, propPrefix + "." + ELASTICSEARCH_PORT, 8080); + if (urls != null) { + urls = urls.trim(); + } + if (urls != null && urls.equalsIgnoreCase("NONE")) { + urls = null; + } + + try { + me = client = new RestHighLevelClient( + RestClient.builder( + MiscUtil.toArray(urls, ",").stream() + .map(x -> new HttpHost(x, port, protocol)) + .<HttpHost>toArray(i -> new HttpHost[i]) + )); + ; + } catch (Throwable t) { + LOG.fatal("Can't connect to Solr server. urls=" + urls, t); Review comment: Based on the logic, every time log is called and if the connection is not established, connect() is called. And if Elastic Search is not available, then we will continuously keep logging fatal. Since plugins run within process of the service, it will be bombarded with fatal logs. I feel, we should limit the number of logging. Probably keeping track of how many times we log per interval. E.g. we can have static variable which says last logged time and have a configurable what should be interval to log. If the interval > (currentTime - lastLogTime ), then log... Just a suggestion ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
