This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 8dd0150cd364f293c555e09fa47ad9e022b59d1f Author: Olivér Szabó <[email protected]> AuthorDate: Mon Jul 23 09:33:46 2018 +0200 AMBARI-23079. Log Feeder: support to use load balancer for Solr API (not only cloud client) (#1835) --- .../config/solr/LogLevelFilterManagerSolr.java | 2 +- .../ambari/logfeeder/plugin/common/ConfigItem.java | 13 +++++ .../logfeeder/common/LogFeederConstants.java | 1 + .../common/LogFeederSolrClientFactory.java | 64 ++++++++++++++++++++++ .../ambari/logfeeder/conf/ApplicationConfig.java | 17 ++---- .../ambari/logfeeder/conf/LogFeederProps.java | 28 +++++++++- .../apache/ambari/logfeeder/output/OutputSolr.java | 40 +++++++------- .../test-config/logfeeder/logfeeder.properties | 3 +- 8 files changed, 132 insertions(+), 36 deletions(-) diff --git a/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java b/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java index 8d8976b..0eabead 100644 --- a/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java +++ b/ambari-logsearch/ambari-logsearch-config-solr/src/main/java/org/apache/ambari/logsearch/config/solr/LogLevelFilterManagerSolr.java @@ -130,7 +130,7 @@ public class LogLevelFilterManagerSolr implements LogLevelFilterManager { } } } catch (Exception e) { - LOG.error("Error during getting log level filters: {}", e); + LOG.error("Error during getting log level filters: {}", e.getMessage()); } logLevelFilterMap.setFilter(logLevelFilterTreeMap); return logLevelFilterMap; diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java index 1cbbfd5..5b50a7e 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java @@ -111,6 +111,19 @@ public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implemen this.drain = drain; } + public List<String> getListValue(String key) { + return getListValue(key, null); + } + + public List<String> getListValue(String key, List<String> defaultValue) { + Object value = configs.get(key); + if (value != null) { + return (List<String>)value; + } else { + return defaultValue; + } + } + public String getStringValue(String property) { return getStringValue(property, null); } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index 10e38f9..251b4fc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -102,5 +102,6 @@ public class LogFeederConstants { public static final boolean MONITOR_SOLR_FILTER_STORAGE_DEFAULT = true; public static final String SOLR_ZK_CONNECTION_STRING = "logfeeder.solr.zk_connect_string"; + public static final String SOLR_URLS = "logfeeder.solr.urls"; } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java new file mode 100644 index 0000000..cf94fb5 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederSolrClientFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.common; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class LogFeederSolrClientFactory { + + private static final Logger logger = LoggerFactory.getLogger(LogFeederSolrClientFactory.class); + + public SolrClient createSolrClient(String zkConnectionString, String[] solrUrls, String collection) { + logger.info("Creating solr client ..."); + logger.info("Using collection=" + collection); + if (solrUrls != null && solrUrls.length > 0) { + logger.info("Using lbHttpSolrClient with urls: {}", + StringUtils.join(appendTo("/" + collection, solrUrls), ",")); + LBHttpSolrClient.Builder builder = new LBHttpSolrClient.Builder(); + builder.withBaseSolrUrls(solrUrls); + return builder.build(); + } else { + logger.info("Using zookeepr. zkConnectString=" + zkConnectionString); + CloudSolrClient.Builder builder = new CloudSolrClient.Builder(); + builder.withZkHost(zkConnectionString); + CloudSolrClient solrClient = builder.build(); + solrClient.setDefaultCollection(collection); + return solrClient; + } + } + + private String[] appendTo(String toAppend, String... appendees) { + for (int i = 0; i < appendees.length; i++) { + appendees[i] = appendees[i] + toAppend; + } + return appendees; + } + +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java index b431464..ccae373 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java @@ -19,6 +19,7 @@ package org.apache.ambari.logfeeder.conf; import com.google.common.collect.Maps; +import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory; import org.apache.ambari.logfeeder.docker.DockerContainerRegistry; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.input.InputConfigUploader; @@ -38,9 +39,7 @@ import org.apache.ambari.logsearch.config.local.LogSearchConfigLogFeederLocal; import org.apache.ambari.logsearch.config.solr.LogLevelFilterManagerSolr; import org.apache.ambari.logsearch.config.solr.LogLevelFilterUpdaterSolr; import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK; -import org.apache.commons.lang3.StringUtils; import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; @@ -69,7 +68,7 @@ public class ApplicationConfig { } @Bean - @DependsOn("logSearchConfigLogFeeder") + @DependsOn({"logSearchConfigLogFeeder", "propertyConfigurer"}) public ConfigHandler configHandler() throws Exception { return new ConfigHandler(logSearchConfigLogFeeder()); } @@ -95,15 +94,9 @@ public class ApplicationConfig { @Bean public LogLevelFilterManager logLevelFilterManager() { if (logFeederProps.isSolrFilterStorage()) { - if (StringUtils.isNotEmpty(logFeederProps.getSolrZkConnectString())) { - CloudSolrClient.Builder builder = new CloudSolrClient.Builder(); - builder.withZkHost(logFeederProps.getSolrZkConnectString()); - CloudSolrClient solrClient = builder.build(); - solrClient.setDefaultCollection("history"); - return new LogLevelFilterManagerSolr(solrClient); - } else { - return null; // TODO: use lb http client - } + SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient( + logFeederProps.getSolrZkConnectString(), logFeederProps.getSolrUrls(), "history"); + return new LogLevelFilterManagerSolr(solrClient); } else { // no default filter manager return null; } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index 8f73e2b..12408b4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.conf; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties; import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.AbstractEnvironment; @@ -179,6 +180,15 @@ public class LogFeederProps implements LogFeederProperties { @Value("${" + LogFeederConstants.SOLR_ZK_CONNECTION_STRING + ":}") private String solrZkConnectString; + @LogSearchPropertyDescription( + name = LogFeederConstants.SOLR_URLS, + description = "Comma separated solr urls (with protocol and port), override "+ LogFeederConstants.SOLR_ZK_CONNECTION_STRING + " config", + examples = {"https://localhost1:8983/solr,https://localhost2:8983"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.SOLR_URLS + ":}") + private String solrUrlsStr; + @Inject private LogEntryCacheConfig logEntryCacheConfig; @@ -285,7 +295,7 @@ public class LogFeederProps implements LogFeederProperties { } public boolean isUseLocalConfigs() { - return useLocalConfigs; + return this.useLocalConfigs; } public void setUseLocalConfigs(boolean useLocalConfigs) { @@ -316,6 +326,21 @@ public class LogFeederProps implements LogFeederProperties { this.solrFilterMonitor = solrFilterMonitor; } + public String getSolrUrlsStr() { + return this.solrUrlsStr; + } + + public void setSolrUrlsStr(String solrUrlsStr) { + this.solrUrlsStr = solrUrlsStr; + } + + public String[] getSolrUrls() { + if (StringUtils.isNotBlank(this.solrUrlsStr)) { + return this.solrUrlsStr.split(","); + } + return null; + } + @PostConstruct public void init() { properties = new Properties(); @@ -331,4 +356,5 @@ public class LogFeederProps implements LogFeederProperties { throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath"); } } + } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index 041c1bd..6b27553 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -19,11 +19,13 @@ package org.apache.ambari.logfeeder.output; +import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory; import org.apache.ambari.logfeeder.conf.LogFeederProps; import org.apache.ambari.logfeeder.plugin.input.InputMarker; import org.apache.ambari.logfeeder.plugin.output.Output; import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -76,6 +78,7 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { private String splitMode; private int splitInterval; private String zkConnectString; + private String[] solrUrls = null; private int maxIntervalMS; private int workers; private int maxBufferSize; @@ -121,8 +124,14 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { type = getStringValue("type"); zkConnectString = getStringValue("zk_connect_string"); - if (StringUtils.isEmpty(zkConnectString)) { - throw new Exception("For solr output the zk_connect_string property need to be set"); + List<String> solrUrlsList = getListValue("solr_urls"); + + if (StringUtils.isBlank(zkConnectString) && CollectionUtils.isEmpty(solrUrlsList)) { + throw new Exception("For solr output the zk_connect_string or solr_urls property need to be set"); + } + + if (CollectionUtils.isNotEmpty(solrUrlsList)) { + solrUrls = solrUrlsList.toArray(new String[0]); } skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME); @@ -176,42 +185,31 @@ public class OutputSolr extends Output<LogFeederProps, InputMarker> { private void createSolrWorkers() throws Exception, MalformedURLException { for (int count = 0; count < workers; count++) { - CloudSolrClient solrClient = getSolrClient(count); + SolrClient solrClient = getSolrClient(count); createSolrWorkerThread(count, solrClient); } } - CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException { - CloudSolrClient solrClient = createSolrClient(); + private SolrClient getSolrClient(int count) throws Exception, MalformedURLException { + SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(zkConnectString, solrUrls, collection); pingSolr(count, solrClient); - - return solrClient; - } - - private CloudSolrClient createSolrClient() throws Exception { - LOG.info("Using zookeepr. zkConnectString=" + zkConnectString); - LOG.info("Using collection=" + collection); - - CloudSolrClient solrClient = new CloudSolrClient.Builder().withZkHost(zkConnectString).build(); - solrClient.setDefaultCollection(collection); return solrClient; } - private void pingSolr(int count, CloudSolrClient solrClient) { + private void pingSolr(int count, SolrClient solrClient) { try { - LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString); + LOG.info("Pinging Solr server."); SolrPingResponse response = solrClient.ping(); if (response.getStatus() == 0) { LOG.info("Ping to Solr server is successful for worker=" + count); } else { LOG.warn( - String.format("Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s, " + - "response=%s", count, zkConnectString, collection, response)); + String.format("Ping to Solr server failed. It would check again. worker=%d, collection=%s, " + + "response=%s", count, collection, response)); } } catch (Throwable t) { LOG.warn(String.format( - "Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s", count, - zkConnectString, collection), t); + "Ping to Solr server failed. It would check again. worker=%d, collection=%s", count, collection), t); } } diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties index 20aed68..bd59765 100644 --- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties +++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties @@ -21,7 +21,6 @@ logfeeder.config.files=shipper-conf/global.config.json,\ shipper-conf/output.config.json logfeeder.log.filter.enable=true logfeeder.solr.config.interval=5 -logfeeder.solr.core.config.name=history logfeeder.solr.zk_connect_string=localhost:9983 logfeeder.cache.enabled=true logfeeder.cache.size=100 @@ -31,5 +30,7 @@ logfeeder.cache.last.dedup.enabled=true logsearch.config.zk_connect_string=localhost:9983 logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN logfeeder.docker.registry.enabled=true +logfeeder.solr.core.config.name=history +#logfeeder.solr.urls=http://solr:8983/solr #logfeeder.configs.local.enabled=true #logfeeder.configs.filter.solr.enabled=true \ No newline at end of file
