Repository: ambari Updated Branches: refs/heads/branch-2.4 3ce7d10b7 -> 92c8a265e
AMBARI-17277. Log Level filter not applied before Log Search Starts at first (Miklos Gergely via oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/92c8a265 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/92c8a265 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/92c8a265 Branch: refs/heads/branch-2.4 Commit: 92c8a265e85ba6783f26f1042dd8d13d7e3e4e15 Parents: 3ce7d10 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Tue Jun 21 16:19:46 2016 +0200 Committer: oleewere <oleew...@gmail.com> Committed: Tue Jun 21 16:34:49 2016 +0200 ---------------------------------------------------------------------- .../org/apache/ambari/logfeeder/LogFeeder.java | 4 +- .../logconfig/FetchConfigFromSolr.java | 4 ++ .../logfeeder/logconfig/LogFeederConstants.java | 2 +- .../ambari/logfeeder/output/OutputSolr.java | 71 ++++++++++++++------ .../apache/ambari/logfeeder/util/SolrUtil.java | 4 +- .../src/main/resources/logfeeder.properties | 3 +- .../ambari/logfeeder/output/OutputSolrTest.java | 2 + .../package/templates/logfeeder.properties.j2 | 2 +- 8 files changed, 64 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 166c0f3..d00ed67 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -120,6 +120,9 @@ public class LogFeeder { } } mergeAllConfigs(); + + LogfeederScheduler.INSTANCE.start(); + outMgr.setOutputList(outputList); for (Output output : outputList) { output.init(); @@ -127,7 +130,6 @@ public class LogFeeder { inputMgr.init(); metricsMgr.init(); //starting timer to fetch config from solr - LogfeederScheduler.INSTANCE.start(); logger.debug("=============="); } http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java index f2d074a..4240b86 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java @@ -153,6 +153,10 @@ public class FetchConfigFromSolr extends Thread { return defaultLevels; } + public static boolean isFilterAvailable() { + return logfeederFilterWrapper != null; + } + public static VLogfeederFilter findComponentFilter(String componentName) { if (logfeederFilterWrapper != null) { HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter(); http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java index f61dc1b..f177e49 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java @@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.logconfig; public class LogFeederConstants { public static final String ALL = "all"; - public static final String NAME = "log_feeder_config"; + public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config"; // solr fields public static final String SOLR_LEVEL = "level"; public static final String SOLR_COMPONENT = "type"; http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index a7202e7..c945ed7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -59,6 +60,8 @@ public class OutputSolr extends Output { private static final int DEFAULT_SPLIT_INTERVAL = 30; private static final int DEFAULT_NUMBER_OF_WORKERS = 1; + private static final int RETRY_INTERVAL = 30; + private String collection; private String splitMode; private int splitInterval; @@ -81,7 +84,7 @@ public class OutputSolr extends Output { createSolrWorkers(); } - private void initParams() { + private void initParams() throws Exception { statMetric.metricsName = "output.solr.write_logs"; writeBytesMetric.metricsName = "output.solr.write_bytes"; @@ -89,6 +92,8 @@ public class OutputSolr extends Output { if (!splitMode.equalsIgnoreCase("none")) { splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL); } + isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none"); + numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS); maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS); @@ -100,7 +105,12 @@ public class OutputSolr extends Output { maxBufferSize = 1; } - LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, " + "numberOfShards=%d. " + collection = getStringValue("collection"); + if (StringUtils.isEmpty(collection)) { + throw new Exception("Collection property is mandatory"); + } + + LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. " + getShortDescription(), workers, splitMode, splitInterval, numberOfShards)); } @@ -135,45 +145,44 @@ public class OutputSolr extends Output { } SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException { - SolrClient solrClient = null; + SolrClient solrClient = createSolrClient(solrUrl, zkHosts, collection); + pingSolr(solrUrl, zkHosts, count, solrClient); + waitForConfig(); + + return solrClient; + } + private SolrClient createSolrClient(String solrUrl, String zkHosts, String collection) throws Exception, MalformedURLException { + SolrClient solrClient; if (zkHosts != null) { - solrClient = createCloudSolrClient(zkHosts); + solrClient = createCloudSolrClient(zkHosts, collection); } else { - solrClient = createHttpSolarClient(solrUrl); + solrClient = createHttpSolarClient(solrUrl, collection); } - - pingSolr(solrUrl, zkHosts, count, solrClient); - return solrClient; } - private SolrClient createCloudSolrClient(String zkHosts) throws Exception { + private SolrClient createCloudSolrClient(String zkHosts, String collection) throws Exception { LOG.info("Using zookeepr. zkHosts=" + zkHosts); - collection = getStringValue("collection"); - if (StringUtils.isEmpty(collection)) { - throw new Exception("For solr cloud property collection is mandatory"); - } LOG.info("Using collection=" + collection); CloudSolrClient solrClient = new CloudSolrClient(zkHosts); solrClient.setDefaultCollection(collection); - isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none"); return solrClient; } - private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException { + private SolrClient createHttpSolarClient(String solrUrl, String collection) throws MalformedURLException { String[] solrUrls = StringUtils.split(solrUrl, ","); if (solrUrls.length == 1) { LOG.info("Using SolrURL=" + solrUrl); - return new HttpSolrClient(solrUrl); + return new HttpSolrClient(solrUrl + "/" + collection); } else { LOG.info("Using load balance solr client. solrUrls=" + solrUrl); - LOG.info("Initial URL for LB solr=" + solrUrls[0]); - LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0]); + LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection); + LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection); for (int i = 1; i < solrUrls.length; i++) { - LOG.info("Adding URL for LB solr=" + solrUrls[i]); - lbSolrClient.addSolrServer(solrUrls[i]); + LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection); + lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection); } return lbSolrClient; } @@ -194,11 +203,30 @@ public class OutputSolr extends Output { } } catch (Throwable t) { LOG.warn(String.format( - "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkHosts=%s, collection=%s", + "Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkHosts=%s, collection=%s", count, solrUrl, zkHosts, collection), t); } } + private void waitForConfig() throws SolrServerException, IOException { + if (!LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false)) { + return; + } + + while (true) { + LOG.info("Checking if config is available"); + if (FetchConfigFromSolr.isFilterAvailable()) { + LOG.info("Config is available"); + return; + } + try { + Thread.sleep(RETRY_INTERVAL * 1000); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + private void createSolrWorkerThread(int count, SolrClient solrClient) { SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient); solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count); @@ -281,7 +309,6 @@ public class OutputSolr extends Output { class SolrWorkerThread extends Thread { private static final String ROUTER_FIELD = "_router_field_"; - private static final int RETRY_INTERVAL = 30; private final SolrClient solrClient; private final Collection<SolrInputDocument> localBuffer = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java index 29feef7..31fbded 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java @@ -54,7 +54,7 @@ public class SolrUtil { private SolrUtil() throws Exception { String url = LogFeederUtil.getStringProperty("logfeeder.solr.url"); String zkHosts = LogFeederUtil.getStringProperty("logfeeder.solr.zkhosts"); - String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.history", "history"); + String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history"); connectToSolr(url, zkHosts, collection); } @@ -180,7 +180,7 @@ public class SolrUtil { HashMap<String, Object> configMap = new HashMap<String, Object>(); SolrQuery solrQuery = new SolrQuery(); solrQuery.setQuery("*:*"); - String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.NAME; + String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME; solrQuery.setFilterQueries(fq); try { QueryResponse response = process(solrQuery); http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index 6cba826..b4655cc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -19,7 +19,6 @@ logfeeder.metrics.collector.hosts= #filter config logfeeder.log.filter.enable=true logfeeder.solr.config.interval=5 -logfeeder.solr.core.history=history logfeeder.solr.zkhosts= logfeeder.solr.url= @@ -28,3 +27,5 @@ logfeeder.solr.jaas.file=/usr/lib/ambari-logsearch-logfeeder/logsearch_solr_jaas #logfeeder tmp dir logfeeder.tmp.dir=/tmp/$username/logfeeder/ + +logfeeder.solr.core.config.name=history http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java index afbccca..3014ed8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java @@ -83,6 +83,7 @@ public class OutputSolrTest { Map<String, Object> config = new HashMap<String, Object>(); config.put("url", "some url"); config.put("workers", "3"); + config.put("collection", "some collection"); outputSolr.loadConfig(config); outputSolr.init(); @@ -153,6 +154,7 @@ public class OutputSolrTest { Map<String, Object> config = new HashMap<String, Object>(); config.put("workers", "3"); + config.put("collection", "some collection"); outputSolr.loadConfig(config); outputSolr.init(); http://git-wip-us.apache.org/repos/asf/ambari/blob/92c8a265/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 index 9887e76..31c252a 100644 --- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 +++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/package/templates/logfeeder.properties.j2 @@ -18,8 +18,8 @@ logfeeder.metrics.collector.hosts={{logfeeder_metrics_collector_hosts}} logfeeder.config.files={{logfeeder_config_files}} logfeeder.log.filter.enable={{logfeeder_log_filter_enable}} logfeeder.solr.config.interval={{logfeeder_solr_config_interval}} -logfeeder.solr.core.history=history logfeeder.solr.zkhosts={{zookeeper_quorum}}{{logsearch_solr_znode}} +logfeeder.solr.core.config.name=history # Custom properties {% for key, value in logfeeder_custom_properties.items() %}