http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java index c57c028..5fc2e14 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java @@ -27,17 +27,15 @@ import org.apache.log4j.Logger; public enum LogsearchReaderFactory { INSTANCE; - private static Logger logger = Logger - .getLogger(LogsearchReaderFactory.class); + private static final Logger LOG = Logger.getLogger(LogsearchReaderFactory.class); public Reader getReader(File file) throws FileNotFoundException { - logger.debug("Inside reader factory for file:" + file); + LOG.debug("Inside reader factory for file:" + file); if (GZIPReader.isValidFile(file.getAbsolutePath())) { - logger.info("Reading file " + file + " as gzip file"); + LOG.info("Reading file " + file + " as gzip file"); return new GZIPReader(file.getAbsolutePath()); } else { return new FileReader(file); } } - }
http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java deleted file mode 100644 index ae0cfc0..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.logconfig; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.TimeZone; - -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logfeeder.util.SolrUtil; -import org.apache.ambari.logfeeder.view.VLogfeederFilter; -import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper; -import org.apache.log4j.Logger; - -public class FetchConfigFromSolr extends Thread { - private static Logger logger = Logger.getLogger(FetchConfigFromSolr.class); - private static VLogfeederFilterWrapper logfeederFilterWrapper = null; - private static int solrConfigInterval = 5;// 5 sec; - private static long delay; - private static String endTimeDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS";//2016-04-05T04:30:00.000Z - private static String sysTimeZone = "GMT"; - - FetchConfigFromSolr(boolean isDaemon) { - this.setName(this.getClass().getSimpleName()); - this.setDaemon(isDaemon); - } - - @Override - public void run() { - String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string"); - String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url"); - if ((zkConnectString == null || zkConnectString.trim().length() == 0 ) - && (solrUrl == null || solrUrl.trim().length() == 0)) { - logger.warn("Neither Solr ZK Connect String nor solr Uril for UserConfig/History is set." + - "Won't look for level configuration from Solr."); - return; - } - solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", solrConfigInterval); - delay = 1000 * solrConfigInterval; - do { - logger.debug("Updating config from solr after every " + solrConfigInterval + " sec."); - pullConfigFromSolr(); - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - logger.error(e.getLocalizedMessage(), e.getCause()); - } - } while (true); - } - - private synchronized void pullConfigFromSolr() { - SolrUtil solrUtil = SolrUtil.getInstance(); - if(solrUtil!=null){ - HashMap<String, Object> configDocMap = solrUtil.getConfigDoc(); - if (configDocMap != null) { - String configJson = (String) configDocMap.get(LogFeederConstants.VALUES); - if (configJson != null) { - logfeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, VLogfeederFilterWrapper.class); - } - } - } - } - - private static boolean isFilterExpired(VLogfeederFilter logfeederFilter) { - boolean isFilterExpired = false;// default is false - if (logfeederFilter != null) { - Date filterEndDate = parseFilterExpireDate(logfeederFilter); - if (filterEndDate != null) { - Date currentDate = getCurrentDate(); - if (currentDate.compareTo(filterEndDate) >= 0) { - logger.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts :" - + listToStr(logfeederFilter.getHosts()) + "Filter is expired because of filter endTime : " - + dateToStr(filterEndDate) + " is older than currentTime :" + dateToStr(currentDate)); - isFilterExpired = true; - } - } - } - return isFilterExpired; - } - - private static String dateToStr(Date date) { - if (date == null) { - return ""; - } - SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat); - TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone); - formatter.setTimeZone(timeZone); - return formatter.format(date); - } - - private static Date parseFilterExpireDate(VLogfeederFilter vLogfeederFilter) { - String expiryTime = vLogfeederFilter.getExpiryTime(); - if (expiryTime != null && !expiryTime.isEmpty()) { - SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat); - TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone); - formatter.setTimeZone(timeZone); - try { - return formatter.parse(expiryTime); - } catch (ParseException e) { - logger.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel() - + " and hosts :" + listToStr(vLogfeederFilter.getHosts())); - } - } - return null; - } - - public static List<String> getAllowedLevels(String hostName, VLogfeederFilter componentFilter) { - String componentName = componentFilter.getLabel(); - List<String> hosts = componentFilter.getHosts(); - List<String> defaultLevels = componentFilter.getDefaultLevels(); - List<String> overrideLevels = componentFilter.getOverrideLevels(); - String expiryTime=componentFilter.getExpiryTime(); - //check is user override or not - if ((expiryTime != null && !expiryTime.isEmpty()) - || (overrideLevels != null && !overrideLevels.isEmpty()) - || (hosts != null && !hosts.isEmpty())) { - if (hosts == null || hosts.isEmpty()) { - // hosts list is empty or null consider it apply on all hosts - hosts.add(LogFeederConstants.ALL); - } - if (LogFeederUtil.isListContains(hosts, hostName, false)) { - if (isFilterExpired(componentFilter)) { - logger.debug("Filter for component " + componentName + " and host :" - + hostName + " is expired at " + componentFilter.getExpiryTime()); - return defaultLevels; - } else { - return overrideLevels; - } - } - } - return defaultLevels; - } - - public static boolean isFilterAvailable() { - return logfeederFilterWrapper != null; - } - - public static VLogfeederFilter findComponentFilter(String componentName) { - if (logfeederFilterWrapper != null) { - HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter(); - if (filter != null) { - VLogfeederFilter componentFilter = filter.get(componentName); - if (componentFilter != null) { - return componentFilter; - } - } - } - logger.trace("Filter is not there for component :" + componentName); - return null; - } - - - public static Date getCurrentDate() { - TimeZone.setDefault(TimeZone.getTimeZone(sysTimeZone)); - Date date = new Date(); - return date; - } - - public static String listToStr(List<String> strList) { - StringBuilder out = new StringBuilder("["); - if (strList != null) { - int counter = 0; - for (Object o : strList) { - if (counter > 0) { - out.append(","); - } - out.append(o.toString()); - counter++; - } - } - out.append("]"); - return out.toString(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java new file mode 100644 index 0000000..801a289 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.logconfig; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; + +/** + * Read configuration from solr and filter the log + */ +public enum FilterLogData { + INSTANCE; + + private static final Logger LOG = Logger.getLogger(FilterLogData.class); + + private static final boolean DEFAULT_VALUE = true; + + public boolean isAllowed(String jsonBlock) { + if (StringUtils.isEmpty(jsonBlock)) { + return DEFAULT_VALUE; + } + Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock); + return isAllowed(jsonObj); + } + + public boolean isAllowed(Map<String, Object> jsonObj) { + boolean isAllowed = applyFilter(jsonObj); + if (!isAllowed) { + LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); + } + return isAllowed; + } + + + private boolean applyFilter(Map<String, Object> jsonObj) { + if (MapUtils.isEmpty(jsonObj)) { + LOG.warn("Output jsonobj is empty"); + return DEFAULT_VALUE; + } + + String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); + String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); + String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); + if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(componentName) && StringUtils.isNotBlank(level)) { + LogFeederFilter componentFilter = LogConfigHandler.findComponentFilter(componentName); + if (componentFilter == null) { + return DEFAULT_VALUE; + } + List<String> allowedLevels = LogConfigHandler.getAllowedLevels(hostName, componentFilter); + if (CollectionUtils.isEmpty(allowedLevels)) { + allowedLevels.add(LogFeederConstants.ALL); + } + return LogFeederUtil.isListContains(allowedLevels, level, false); + } + else { + return DEFAULT_VALUE; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java new file mode 100644 index 0000000..12c744c --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.logconfig; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.SolrRequest.METHOD; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; + +public class LogConfigFetcher { + private static final Logger LOG = Logger.getLogger(LogConfigFetcher.class); + + private static LogConfigFetcher instance; + public synchronized static LogConfigFetcher getInstance() { + if (instance == null) { + try { + instance = new LogConfigFetcher(); + } catch (Exception e) { + String logMessageKey = LogConfigFetcher.class.getSimpleName() + "_SOLR_UTIL"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error constructing solrUtil", e, LOG, Level.WARN); + } + } + return instance; + } + + private SolrClient solrClient; + + private String solrDetail = ""; + + public LogConfigFetcher() throws Exception { + String url = LogFeederUtil.getStringProperty("logfeeder.solr.url"); + String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string"); + String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history"); + connectToSolr(url, zkConnectString, collection); + } + + private SolrClient connectToSolr(String url, String zkConnectString, String collection) throws Exception { + solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection + ", url=" + url; + + LOG.info("connectToSolr() " + solrDetail); + if (StringUtils.isEmpty(collection)) { + throw new Exception("For solr, collection name is mandatory. " + solrDetail); + } + + if (StringUtils.isEmpty(zkConnectString) && StringUtils.isBlank(url)) + throw new Exception("Both zkConnectString and URL are empty. zkConnectString=" + zkConnectString + ", collection=" + + collection + ", url=" + url); + + if (StringUtils.isNotEmpty(zkConnectString)) { + solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection; + LOG.info("Using zookeepr. " + solrDetail); + CloudSolrClient solrClouldClient = new CloudSolrClient(zkConnectString); + solrClouldClient.setDefaultCollection(collection); + solrClient = solrClouldClient; + checkSolrStatus(3 * 60 * 1000); + } else { + solrDetail = "collection=" + collection + ", url=" + url; + String collectionURL = url + "/" + collection; + LOG.info("Connecting to solr : " + collectionURL); + solrClient = new HttpSolrClient(collectionURL); + } + return solrClient; + } + + private boolean checkSolrStatus(int waitDurationMS) { + boolean status = false; + try { + long beginTimeMS = System.currentTimeMillis(); + long waitIntervalMS = 2000; + int pingCount = 0; + while (true) { + pingCount++; + CollectionAdminResponse response = null; + try { + CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List(); + response = colListReq.process(solrClient); + } catch (Exception ex) { + LOG.error("Con't connect to Solr. solrDetail=" + solrDetail, ex); + } + if (response != null && response.getStatus() == 0) { + LOG.info("Solr getCollections() is success. solr=" + solrDetail); + status = true; + break; + } + if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) { + LOG.error("Solr is not reachable even after " + (System.currentTimeMillis() - beginTimeMS) + + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr=" + + solrDetail + ", response=" + response); + break; + } else { + LOG.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount + ". Will sleep for " + + waitIntervalMS + " ms and try again." + " solr=" + solrDetail + ", response=" + response); + } + Thread.sleep(waitIntervalMS); + } + } catch (Throwable t) { + LOG.error("Seems Solr is not up. solrDetail=" + solrDetail, t); + } + return status; + } + + public Map<String, Object> getConfigDoc() { + HashMap<String, Object> configMap = new HashMap<String, Object>(); + SolrQuery solrQuery = new SolrQuery(); + solrQuery.setQuery("*:*"); + String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME; + solrQuery.setFilterQueries(fq); + try { + QueryResponse response = process(solrQuery); + if (response != null) { + SolrDocumentList documentList = response.getResults(); + if (CollectionUtils.isNotEmpty(documentList)) { + SolrDocument configDoc = documentList.get(0); + String configJson = LogFeederUtil.getGson().toJson(configDoc); + configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson); + } + } + } catch (Exception e) { + String logMessageKey = this.getClass().getSimpleName() + "_FETCH_FILTER_CONFIG_ERROR"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error getting filter config from solr", e, LOG, Level.ERROR); + } + return configMap; + } + + private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException { + if (solrClient != null) { + QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST); + return queryResponse; + } else { + LOG.error("solrClient can't be null"); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java new file mode 100644 index 0000000..4f52b0b --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.logconfig; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +public class LogConfigHandler extends Thread { + private static final Logger LOG = Logger.getLogger(LogConfigHandler.class); + + private static final int DEFAULT_SOLR_CONFIG_INTERVAL = 5; + private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; + private static final String TIMEZONE = "GMT"; + + static { + TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE)); + } + + private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() { + protected DateFormat initialValue() { + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE)); + return dateFormat; + } + }; + + private static LogFeederFilterWrapper logFeederFilterWrapper; + + private static boolean running = false; + + public static void handleConfig() { + boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false); + if (!filterEnable) { + LOG.info("Logfeeder filter Scheduler is disabled."); + return; + } + if (!running) { + new LogConfigHandler().start(); + running = true; + LOG.info("Logfeeder Filter Thread started!"); + } else { + LOG.warn("Logfeeder Filter Thread is already running."); + } + } + + private LogConfigHandler() { + setName(getClass().getSimpleName()); + setDaemon(true); + } + + @Override + public void run() { + String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string"); + String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url"); + if (StringUtils.isBlank(zkConnectString) && StringUtils.isBlank(solrUrl)) { + LOG.warn("Neither Solr ZK Connect String nor solr Url for UserConfig/History is set." + + "Won't look for level configuration from Solr."); + return; + } + + int solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", DEFAULT_SOLR_CONFIG_INTERVAL); + do { + LOG.debug("Updating config from solr after every " + solrConfigInterval + " sec."); + fetchConfig(); + try { + Thread.sleep(1000 * solrConfigInterval); + } catch (InterruptedException e) { + LOG.error(e.getLocalizedMessage(), e.getCause()); + } + } while (true); + } + + private synchronized void fetchConfig() { + LogConfigFetcher fetcher = LogConfigFetcher.getInstance(); + if (fetcher != null) { + Map<String, Object> configDocMap = fetcher.getConfigDoc(); + String configJson = (String) configDocMap.get(LogFeederConstants.VALUES); + if (configJson != null) { + logFeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, LogFeederFilterWrapper.class); + } + } + } + + public static boolean isFilterAvailable() { + return logFeederFilterWrapper != null; + } + + public static List<String> getAllowedLevels(String hostName, LogFeederFilter componentFilter) { + String componentName = componentFilter.getLabel(); + List<String> hosts = componentFilter.getHosts(); + List<String> defaultLevels = componentFilter.getDefaultLevels(); + List<String> overrideLevels = componentFilter.getOverrideLevels(); + String expiryTime = componentFilter.getExpiryTime(); + + // check is user override or not + if (StringUtils.isNotEmpty(expiryTime) || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) { + if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts + hosts.add(LogFeederConstants.ALL); + } + + if (LogFeederUtil.isListContains(hosts, hostName, false)) { + if (isFilterExpired(componentFilter)) { + LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " + + componentFilter.getExpiryTime()); + return defaultLevels; + } else { + return overrideLevels; + } + } + } + return defaultLevels; + } + + private static boolean isFilterExpired(LogFeederFilter logfeederFilter) { + if (logfeederFilter == null) + return false; + + Date filterEndDate = parseFilterExpireDate(logfeederFilter); + if (filterEndDate == null) { + return false; + } + + Date currentDate = new Date(); + if (!currentDate.before(filterEndDate)) { + LOG.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts : [" + + StringUtils.join(logfeederFilter.getHosts(), ',') + "] is expired because of filter endTime : " + + formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate)); + return true; + } else { + return false; + } + } + + private static Date parseFilterExpireDate(LogFeederFilter vLogfeederFilter) { + String expiryTime = vLogfeederFilter.getExpiryTime(); + if (StringUtils.isNotEmpty(expiryTime)) { + try { + return formatter.get().parse(expiryTime); + } catch (ParseException e) { + LOG.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel() + + " and hosts : [" + StringUtils.join(vLogfeederFilter.getHosts(), ',') + "]"); + } + } + return null; + } + + public static LogFeederFilter findComponentFilter(String componentName) { + if (logFeederFilterWrapper != null) { + HashMap<String, LogFeederFilter> filter = logFeederFilterWrapper.getFilter(); + if (filter != null) { + LogFeederFilter componentFilter = filter.get(componentName); + if (componentFilter != null) { + return componentFilter; + } + } + } + LOG.trace("Filter is not there for component :" + componentName); + return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java deleted file mode 100644 index 09673a0..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.logconfig; - -public class LogFeederConstants { - - public static final String ALL = "all"; - public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config"; - public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN"; - // solr fields - public static final String SOLR_LEVEL = "level"; - public static final String SOLR_COMPONENT = "type"; - public static final String SOLR_HOST = "host"; - - // UserConfig Constants History - public static final String VALUES = "jsons"; - public static final String ROW_TYPE = "rowtype"; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java new file mode 100644 index 0000000..60c8ae8 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.logconfig; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class LogFeederFilter { + + private String label; + private List<String> hosts; + private List<String> defaultLevels; + private List<String> overrideLevels; + private String expiryTime; + + public LogFeederFilter() { + hosts = new ArrayList<String>(); + defaultLevels = new ArrayList<String>(); + overrideLevels = new ArrayList<String>(); + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public List<String> getHosts() { + return hosts; + } + + public void setHosts(List<String> hosts) { + this.hosts = hosts; + } + + public List<String> getDefaultLevels() { + return defaultLevels; + } + + public void setDefaultLevels(List<String> defaultLevels) { + this.defaultLevels = defaultLevels; + } + + public List<String> getOverrideLevels() { + return overrideLevels; + } + + public void setOverrideLevels(List<String> overrideLevels) { + this.overrideLevels = overrideLevels; + } + + public String getExpiryTime() { + return expiryTime; + } + + public void setExpiryTime(String expiryTime) { + this.expiryTime = expiryTime; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java new file mode 100644 index 0000000..9199cd3 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.logconfig; + +import java.util.HashMap; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class LogFeederFilterWrapper { + + private HashMap<String, LogFeederFilter> filter; + private String id; + + public HashMap<String, LogFeederFilter> getFilter() { + return filter; + } + + public void setFilter(HashMap<String, LogFeederFilter> filter) { + this.filter = filter; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java deleted file mode 100644 index bc807193..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.logconfig; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Logger; - -public enum LogfeederScheduler { - - INSTANCE; - - private Logger logger = Logger.getLogger(LogfeederScheduler.class); - - private static boolean running = false; - - public synchronized void start() { - boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false); - if (!filterEnable) { - logger.info("Logfeeder filter Scheduler is disabled."); - return; - } - if (!running) { - for (Thread thread : getThreadList()) { - thread.start(); - } - running = true; - logger.info("Logfeeder Scheduler started!"); - } else { - logger.warn("Logfeeder Scheduler is already running."); - } - } - - private List<Thread> getThreadList() { - List<Thread> tasks = new ArrayList<Thread>(); - Thread configMonitor = new FetchConfigFromSolr(true); - tasks.add(configMonitor); - return tasks; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java deleted file mode 100644 index b5e4eb3..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.logconfig.filter; - -import java.util.List; -import java.util.Map; - -import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr; -import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.ambari.logfeeder.view.VLogfeederFilter; -import org.apache.log4j.Logger; - -class ApplyLogFilter extends DefaultDataFilter { - - private static Logger logger = Logger.getLogger(ApplyLogFilter.class); - - @Override - public boolean applyFilter(Map<String, Object> jsonObj, boolean defaultValue) { - if (isEmpty(jsonObj)) { - logger.warn("Output jsonobj is empty"); - return defaultValue; - } - String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); - if (isNotEmpty(hostName)) { - String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); - if (isNotEmpty(componentName)) { - String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); - if (isNotEmpty(level)) { - VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName); - if (componentFilter == null) { - return defaultValue; - } - List<String> allowedLevels = FetchConfigFromSolr.getAllowedLevels( - hostName, componentFilter); - if (allowedLevels == null || allowedLevels.isEmpty()) { - allowedLevels.add(LogFeederConstants.ALL); - } - return LogFeederUtil.isListContains(allowedLevels, level, false); - } - } - } - return defaultValue; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java deleted file mode 100644 index 04d2ca4..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.logconfig.filter; - -import java.util.Map; - -/** - * Default filter to allow everything - */ -class DefaultDataFilter { - public boolean applyFilter(Map<String, Object> outputJsonObj, boolean defaultValue) { - return defaultValue; - } - - protected boolean isEmpty(Map<String, Object> map) { - if (map == null || map.isEmpty()) { - return true; - } - return false; - } - - protected boolean isEmpty(String str) { - if (str == null || str.trim().isEmpty()) { - return true; - } - return false; - } - - protected boolean isNotEmpty(String str) { - return !isEmpty(str); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java deleted file mode 100644 index 3a8eae9..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.logconfig.filter; - -import java.util.Map; - -import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Logger; - -/** - * Read configuration from solr and filter the log - */ -public enum FilterLogData { - INSTANCE; - private ApplyLogFilter applyLogFilter = new ApplyLogFilter(); - private static Logger logger = Logger.getLogger(FilterLogData.class); - // by default allow every log - boolean defaultValue = true; - - public boolean isAllowed(String jsonBlock) { - if (jsonBlock == null || jsonBlock.isEmpty()) { - return defaultValue; - } - Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock); - return isAllowed(jsonObj); - } - - public boolean isAllowed(Map<String, Object> jsonObj) { - boolean isAllowed = applyLogFilter.applyFilter(jsonObj, defaultValue); - if (!isAllowed) { - logger.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); - } - return isAllowed; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java index 906dd25..96709c0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java @@ -26,22 +26,18 @@ public abstract class Mapper { protected String fieldName; private String mapClassCode; - public boolean init(String inputDesc, String fieldName, - String mapClassCode, Object mapConfigs) { + public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs); + + protected void init(String inputDesc, String fieldName, String mapClassCode) { this.inputDesc = inputDesc; this.fieldName = fieldName; this.mapClassCode = mapClassCode; - return true; } - public Object apply(Map<String, Object> jsonObj, Object value) { - return value; - } + public abstract Object apply(Map<String, Object> jsonObj, Object value); @Override public String toString() { - return "mapClass=" + mapClassCode + ", input=" + inputDesc - + ", fieldName=" + fieldName; + return "mapClass=" + mapClassCode + ", input=" + inputDesc + ", fieldName=" + fieldName; } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java index 6dbf8be..eb3ae01 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java @@ -31,31 +31,29 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; public class MapperDate extends Mapper { - private static final Logger logger = Logger.getLogger(MapperDate.class); + private static final Logger LOG = Logger.getLogger(MapperDate.class); private SimpleDateFormat targetDateFormatter = null; private boolean isEpoch = false; private SimpleDateFormat srcDateFormatter=null; @Override - public boolean init(String inputDesc, String fieldName, - String mapClassCode, Object mapConfigs) { - super.init(inputDesc, fieldName, mapClassCode, mapConfigs); + public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + init(inputDesc, fieldName, mapClassCode); if (!(mapConfigs instanceof Map)) { - logger.fatal("Can't initialize object. mapConfigs class is not of type Map. " - + mapConfigs.getClass().getName() - + ", map=" - + this.toString()); + LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() + + ", map=" + this); return false; } + @SuppressWarnings("unchecked") Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs; String targetDateFormat = (String) mapObjects.get("target_date_pattern"); String srcDateFormat = (String) mapObjects.get("src_date_pattern"); if (StringUtils.isEmpty(targetDateFormat)) { - logger.fatal("Date format for map is empty. " + this.toString()); + LOG.fatal("Date format for map is empty. " + this); } else { - logger.info("Date mapper format is " + targetDateFormat); + LOG.info("Date mapper format is " + targetDateFormat); if (targetDateFormat.equalsIgnoreCase("epoch")) { isEpoch = true; @@ -68,8 +66,7 @@ public class MapperDate extends Mapper { } return true; } catch (Throwable ex) { - logger.fatal("Error creating date format. format=" - + targetDateFormat + ". " + this.toString()); + LOG.fatal("Error creating date format. format=" + targetDateFormat + ". " + this.toString()); } } } @@ -84,7 +81,7 @@ public class MapperDate extends Mapper { long ms = Long.parseLong(value.toString()) * 1000; value = new Date(ms); } else if (targetDateFormatter != null) { - if(srcDateFormatter!=null){ + if (srcDateFormatter != null) { Date srcDate = srcDateFormatter.parse(value.toString()); //set year in src_date when src_date does not have year component if (!srcDateFormatter.toPattern().contains("yy")) { @@ -108,12 +105,9 @@ public class MapperDate extends Mapper { } jsonObj.put(fieldName, value); } catch (Throwable t) { - LogFeederUtil.logErrorMessageByInterval(this.getClass() - .getSimpleName() + ":apply", - "Error applying date transformation. isEpoch=" - + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"") - + ", value=" + value + ". " + this.toString(), - t, logger, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying date transformation." + + " isEpoch=" + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"") + + ", value=" + value + ". " + this.toString(), t, LOG, Level.ERROR); } } return value; http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java index c692a9d..9b6e83c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java @@ -30,24 +30,23 @@ import org.apache.log4j.Logger; * Overrides the value for the field */ public class MapperFieldName extends Mapper { - private static final Logger logger = Logger.getLogger(MapperFieldName.class); + private static final Logger LOG = Logger.getLogger(MapperFieldName.class); private String newValue = null; @Override - public boolean init(String inputDesc, String fieldName, - String mapClassCode, Object mapConfigs) { - super.init(inputDesc, fieldName, mapClassCode, mapConfigs); + public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + init(inputDesc, fieldName, mapClassCode); if (!(mapConfigs instanceof Map)) { - logger.fatal("Can't initialize object. mapConfigs class is not of type Map. " - + mapConfigs.getClass().getName()); + LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName()); return false; } + @SuppressWarnings("unchecked") Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs; newValue = (String) mapObjects.get("new_fieldname"); if (StringUtils.isEmpty(newValue)) { - logger.fatal("Map field value is empty."); + LOG.fatal("Map field value is empty."); return false; } return true; @@ -59,12 +58,9 @@ public class MapperFieldName extends Mapper { jsonObj.remove(fieldName); jsonObj.put(newValue, value); } else { - LogFeederUtil.logErrorMessageByInterval(this.getClass() - .getSimpleName() + ":apply", - "New fieldName is null, so transformation is not applied. " - + this.toString(), null, logger, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", + "New fieldName is null, so transformation is not applied. " + this.toString(), null, LOG, Level.ERROR); } return value; } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java index e618261..87cda65 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java @@ -30,25 +30,25 @@ import org.apache.log4j.Logger; * Overrides the value for the field */ public class MapperFieldValue extends Mapper { - private Logger logger = Logger.getLogger(MapperFieldValue.class); + private static final Logger LOG = Logger.getLogger(MapperFieldValue.class); + private String prevValue = null; private String newValue = null; @Override - public boolean init(String inputDesc, String fieldName, - String mapClassCode, Object mapConfigs) { - super.init(inputDesc, fieldName, mapClassCode, mapConfigs); + public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) { + init(inputDesc, fieldName, mapClassCode); if (!(mapConfigs instanceof Map)) { - logger.fatal("Can't initialize object. mapConfigs class is not of type Map. " - + mapConfigs.getClass().getName()); + LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName()); return false; } + @SuppressWarnings("unchecked") Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs; prevValue = (String) mapObjects.get("pre_value"); newValue = (String) mapObjects.get("post_value"); if (StringUtils.isEmpty(newValue)) { - logger.fatal("Map field value is empty."); + LOG.fatal("Map field value is empty."); return false; } return true; @@ -56,20 +56,15 @@ public class MapperFieldValue extends Mapper { @Override public Object apply(Map<String, Object> jsonObj, Object value) { - if (newValue != null) { - if (prevValue != null) { - if (prevValue.equalsIgnoreCase(value.toString())) { - value = newValue; - jsonObj.put(fieldName, value); - } + if (newValue != null && prevValue != null) { + if (prevValue.equalsIgnoreCase(value.toString())) { + value = newValue; + jsonObj.put(fieldName, value); } } else { - LogFeederUtil.logErrorMessageByInterval( - this.getClass().getSimpleName() + ":apply", - "New value is null, so transformation is not applied. " - + this.toString(), null, logger, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", + "New value is null, so transformation is not applied. " + this.toString(), null, LOG, Level.ERROR); } return value; } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java index 88b812e..21ca165 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.metrics; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.log4j.Logger; @@ -28,20 +29,19 @@ import java.util.Collection; // TODO: Refactor for failover public class LogFeederAMSClient extends AbstractTimelineMetricsSink { - private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class); + private static final Logger LOG = Logger.getLogger(LogFeederAMSClient.class); private String collectorHosts = null; public LogFeederAMSClient() { - collectorHosts = LogFeederUtil - .getStringProperty("logfeeder.metrics.collector.hosts"); - if (collectorHosts != null && collectorHosts.trim().length() == 0) { + collectorHosts = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.hosts"); + if (StringUtils.isBlank(collectorHosts)) { collectorHosts = null; } if (collectorHosts != null) { collectorHosts = collectorHosts.trim(); } - logger.info("AMS collector URL=" + collectorHosts); + LOG.info("AMS collector URL=" + collectorHosts); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java deleted file mode 100644 index abb84c7..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.metrics; - -public class MetricCount { - public String metricsName = null; - public boolean isPointInTime = false; - - public long count = 0; - public long prevLogCount = 0; - public long prevLogMS = System.currentTimeMillis(); - public long prevPublishCount = 0; - public int publishCount = 0; // Count of published metrics. Used for first time sending metrics -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java new file mode 100644 index 0000000..e7f5d37 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.metrics; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class MetricData { + public final String metricsName; + public final boolean isPointInTime; + + public MetricData(String metricsName, boolean isPointInTime) { + this.metricsName = metricsName; + this.isPointInTime = isPointInTime; + } + + public long value = 0; + public long prevPublishValue = 0; + + public long prevLogValue = 0; + public long prevLogTime = System.currentTimeMillis(); + + public int publishCount = 0; // Number of times the metric was published so far + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java new file mode 100644 index 0000000..942c0b4 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.metrics; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.log4j.Logger; + +public class MetricsManager { + private static final Logger LOG = Logger.getLogger(MetricsManager.class); + + private boolean isMetricsEnabled = false; + private String nodeHostName = null; + private String appId = "logfeeder"; + + private long lastPublishTimeMS = 0; // Let's do the first publish immediately + private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock + + private int publishIntervalMS = 60 * 1000; + private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep the metrics in memory forever + private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>(); + private LogFeederAMSClient amsClient = null; + + public void init() { + LOG.info("Initializing MetricsManager()"); + amsClient = new LogFeederAMSClient(); + + if (amsClient.getCollectorUri(null) != null) { + findNodeHostName(); + if (nodeHostName == null) { + isMetricsEnabled = false; + LOG.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics"); + } else { + isMetricsEnabled = true; + LOG.info("LogFeeder Metrics is enabled. Metrics host=" + amsClient.getCollectorUri(null)); + } + } else { + LOG.info("LogFeeder Metrics publish is disabled"); + } + } + + private void findNodeHostName() { + nodeHostName = LogFeederUtil.getStringProperty("node.hostname"); + if (nodeHostName == null) { + try { + nodeHostName = InetAddress.getLocalHost().getHostName(); + } catch (Throwable e) { + LOG.warn("Error getting hostname using InetAddress.getLocalHost().getHostName()", e); + } + } + if (nodeHostName == null) { + try { + nodeHostName = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (Throwable e) { + LOG.warn("Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", e); + } + } + } + + public boolean isMetricsEnabled() { + return isMetricsEnabled; + } + + public synchronized void useMetrics(List<MetricData> metricsList) { + if (!isMetricsEnabled) { + return; + } + LOG.info("useMetrics() metrics.size=" + metricsList.size()); + long currMS = System.currentTimeMillis(); + + gatherMetrics(metricsList, currMS); + publishMetrics(currMS); + } + + private void gatherMetrics(List<MetricData> metricsList, long currMS) { + Long currMSLong = new Long(currMS); + for (MetricData metric : metricsList) { + if (metric.metricsName == null) { + LOG.debug("metric.metricsName is null"); + continue; + } + long currCount = metric.value; + if (!metric.isPointInTime && metric.publishCount > 0 && currCount <= metric.prevPublishValue) { + LOG.debug("Nothing changed. " + metric.metricsName + ", currCount=" + currCount + ", prevPublishCount=" + + metric.prevPublishValue); + continue; + } + metric.publishCount++; + + LOG.debug("Ensuring metrics=" + metric.metricsName); + TimelineMetric timelineMetric = metricsMap.get(metric.metricsName); + if (timelineMetric == null) { + LOG.debug("Creating new metric obbject for " + metric.metricsName); + timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metric.metricsName); + timelineMetric.setHostName(nodeHostName); + timelineMetric.setAppId(appId); + timelineMetric.setStartTime(currMS); + timelineMetric.setType("Long"); + timelineMetric.setMetricValues(new TreeMap<Long, Double>()); + + metricsMap.put(metric.metricsName, timelineMetric); + } + + LOG.debug("Adding metrics=" + metric.metricsName); + if (metric.isPointInTime) { + timelineMetric.getMetricValues().put(currMSLong, new Double(currCount)); + } else { + Double value = timelineMetric.getMetricValues().get(currMSLong); + if (value == null) { + value = new Double(0); + } + value += (currCount - metric.prevPublishValue); + timelineMetric.getMetricValues().put(currMSLong, value); + metric.prevPublishValue = currCount; + } + } + } + + private void publishMetrics(long currMS) { + if (!metricsMap.isEmpty() && currMS - lastPublishTimeMS > publishIntervalMS) { + try { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(new ArrayList<TimelineMetric>(metricsMap.values())); + amsClient.emitMetrics(timelineMetrics); + + LOG.info("Published " + timelineMetrics.getMetrics().size() + " metrics to AMS"); + metricsMap.clear(); + lastPublishTimeMS = currMS; + } catch (Throwable t) { + LOG.warn("Error sending metrics to AMS.", t); + if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) { + LOG.error("AMS was not sent for last " + maxMetricsBuffer / 1000 + + " seconds. Purging it and will start rebuilding it again"); + metricsMap.clear(); + lastFailedPublishTimeMS = currMS; + } + } + } else { + LOG.info("Not publishing metrics. metrics.size()=" + metricsMap.size() + ", lastPublished=" + + (currMS - lastPublishTimeMS) / 1000 + " seconds ago, intervalConfigured=" + publishIntervalMS / 1000); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java deleted file mode 100644 index 33397c7..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.metrics; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.TreeMap; - -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.log4j.Logger; - -public class MetricsMgr { - private static final Logger logger = Logger.getLogger(MetricsMgr.class); - - private boolean isMetricsEnabled = false; - private String nodeHostName = null; - private String appId = "logfeeder"; - - private long lastPublishTimeMS = 0; // Let's do the first publish immediately - private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock - - private int publishIntervalMS = 60 * 1000; - private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep - // the metrics in memory forever - private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>(); - private LogFeederAMSClient amsClient = null; - - public void init() { - logger.info("Initializing MetricsMgr()"); - amsClient = new LogFeederAMSClient(); - - if (amsClient.getCollectorUri(null) != null) { - nodeHostName = LogFeederUtil.getStringProperty("node.hostname"); - if (nodeHostName == null) { - try { - nodeHostName = InetAddress.getLocalHost().getHostName(); - } catch (Throwable e) { - logger.warn( - "Error getting hostname using InetAddress.getLocalHost().getHostName()", - e); - } - if (nodeHostName == null) { - try { - nodeHostName = InetAddress.getLocalHost() - .getCanonicalHostName(); - } catch (Throwable e) { - logger.warn( - "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", - e); - } - } - } - if (nodeHostName == null) { - isMetricsEnabled = false; - logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics"); - } else { - isMetricsEnabled = true; - logger.info("LogFeeder Metrics is enabled. Metrics host=" - + amsClient.getCollectorUri(null)); - } - } else { - logger.info("LogFeeder Metrics publish is disabled"); - } - } - - public boolean isMetricsEnabled() { - return isMetricsEnabled; - } - - synchronized public void useMetrics(List<MetricCount> metricsList) { - if (!isMetricsEnabled) { - return; - } - logger.info("useMetrics() metrics.size=" + metricsList.size()); - long currMS = System.currentTimeMillis(); - Long currMSLong = new Long(currMS); - for (MetricCount metric : metricsList) { - if (metric.metricsName == null) { - logger.debug("metric.metricsName is null"); - // Metrics is not meant to be published - continue; - } - long currCount = metric.count; - if (!metric.isPointInTime && metric.publishCount > 0 - && currCount <= metric.prevPublishCount) { - // No new data added, so let's ignore it - logger.debug("Nothing changed. " + metric.metricsName - + ", currCount=" + currCount + ", prevPublishCount=" - + metric.prevPublishCount); - continue; - } - metric.publishCount++; - - TimelineMetric timelineMetric = metricsMap.get(metric.metricsName); - if (timelineMetric == null) { - logger.debug("Creating new metric obbject for " - + metric.metricsName); - // First time for this metric - timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metric.metricsName); - timelineMetric.setHostName(nodeHostName); - timelineMetric.setAppId(appId); - timelineMetric.setStartTime(currMS); - timelineMetric.setType("Long"); - timelineMetric.setMetricValues(new TreeMap<Long, Double>()); - - metricsMap.put(metric.metricsName, timelineMetric); - } - logger.debug("Adding metrics=" + metric.metricsName); - if (metric.isPointInTime) { - timelineMetric.getMetricValues().put(currMSLong, - new Double(currCount)); - } else { - Double value = timelineMetric.getMetricValues().get(currMSLong); - if (value == null) { - value = new Double(0); - } - value += (currCount - metric.prevPublishCount); - timelineMetric.getMetricValues().put(currMSLong, value); - metric.prevPublishCount = currCount; - } - } - - if (metricsMap.size() > 0 - && currMS - lastPublishTimeMS > publishIntervalMS) { - try { - // Time to publish - TimelineMetrics timelineMetrics = new TimelineMetrics(); - List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>(); - timeLineMetricList.addAll(metricsMap.values()); - timelineMetrics.setMetrics(timeLineMetricList); - amsClient.emitMetrics(timelineMetrics); - logger.info("Published " + timeLineMetricList.size() - + " metrics to AMS"); - metricsMap.clear(); - timeLineMetricList.clear(); - lastPublishTimeMS = currMS; - } catch (Throwable t) { - logger.warn("Error sending metrics to AMS.", t); - if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) { - logger.error("AMS was not sent for last " - + maxMetricsBuffer - / 1000 - + " seconds. Purging it and will start rebuilding it again"); - metricsMap.clear(); - lastFailedPublishTimeMS = currMS; - } - } - } else { - logger.info("Not publishing metrics. metrics.size()=" - + metricsMap.size() + ", lastPublished=" - + (currMS - lastPublishTimeMS) / 1000 - + " seconds ago, intervalConfigured=" + publishIntervalMS - / 1000); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java index 6f84251..bc6a553 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java @@ -26,16 +26,19 @@ import java.util.Map.Entry; import org.apache.ambari.logfeeder.common.ConfigBlock; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; public abstract class Output extends ConfigBlock { - private static final Logger logger = Logger.getLogger(Output.class); + private static final Logger LOG = Logger.getLogger(Output.class); private String destination = null; - protected MetricCount writeBytesMetric = new MetricCount(); + protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false); + protected String getWriteBytesMetricName() { + return null; + } @Override public String getShortDescription() { @@ -67,7 +70,7 @@ public abstract class Output extends ConfigBlock { * Extend this method to clean up */ public void close() { - logger.info("Calling base close()." + getShortDescription()); + LOG.info("Calling base close()." + getShortDescription()); isClosed = true; } @@ -91,7 +94,7 @@ public abstract class Output extends ConfigBlock { } @Override - public void addMetricsContainers(List<MetricCount> metricsList) { + public void addMetricsContainers(List<MetricData> metricsList) { super.addMetricsContainers(metricsList); metricsList.add(writeBytesMetric); } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java index 4a408f9..c46086e 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java @@ -27,18 +27,16 @@ import org.apache.ambari.logfeeder.input.InputMarker; * This contains the output json object and InputMarker. */ public class OutputData { - Map<String, Object> jsonObj; - InputMarker inputMarker; + public final Map<String, Object> jsonObj; + public final InputMarker inputMarker; public OutputData(Map<String, Object> jsonObj, InputMarker inputMarker) { - super(); this.jsonObj = jsonObj; this.inputMarker = inputMarker; } @Override public String toString() { - return "OutputData [jsonObj=" + jsonObj + ", inputMarker=" - + inputMarker + "]"; + return "OutputData [jsonObj=" + jsonObj + ", inputMarker=" + inputMarker + "]"; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java index 2d41a0b..fa4e17b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java @@ -28,16 +28,15 @@ import org.apache.log4j.Logger; */ public class OutputDevNull extends Output { - private static Logger logger = Logger.getLogger(OutputDevNull.class); + private static final Logger LOG = Logger.getLogger(OutputDevNull.class); @Override public void write(String block, InputMarker inputMarker){ - logger.trace("Ignore log block: " + block); + LOG.trace("Ignore log block: " + block); } @Override public void copyFile(File inputFile, InputMarker inputMarker) { - throw new UnsupportedOperationException( - "copyFile method is not yet supported for output=dev_null"); + throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null"); } }
