http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java new file mode 100644 index 0000000..dd67d07 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logfeeder.ConfigBlock; +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.MetricCount; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.log4j.Logger; + +import com.google.gson.reflect.TypeToken; + +public abstract class Output extends ConfigBlock { + static private Logger logger = Logger.getLogger(Output.class); + + String destination = null; + + Type jsonType = new TypeToken<Map<String, String>>() { + }.getType(); + + public MetricCount writeBytesMetric = new MetricCount(); + + @Override + public String getShortDescription() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getNameForThread() { + if (destination != null) { + return destination; + } + return super.getNameForThread(); + } + + public void write(String block, InputMarker inputMarker) throws Exception { + // No-op. Please implement in sub classes + } + + /** + * @param jsonObj + * @param input + * @throws Exception + */ + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) + throws Exception { + write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker); + } + + boolean isClosed = false; + + /** + * Extend this method to clean up + */ + public void close() { + logger.info("Calling base close()." + getShortDescription()); + isClosed = true; + } + + /** + * This is called on shutdown. All output should extend it. + * + * @return + */ + public boolean isClosed() { + return isClosed; + } + + public long getPendingCount() { + return 0; + } + + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + @Override + public void addMetricsContainers(List<MetricCount> metricsList) { + super.addMetricsContainers(metricsList); + metricsList.add(writeBytesMetric); + } + + @Override + public synchronized void logStat() { + super.logStat(); + + //Printing stat for writeBytesMetric + logStatForMetric(writeBytesMetric, "Stat: Bytes Written"); + + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java new file mode 100644 index 0000000..8df1d29 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.util.Map; + +import org.apache.ambari.logfeeder.input.InputMarker; + +/** + * This contains the output json object and InputMarker. + */ +public class OutputData { + Map<String, Object> jsonObj; + InputMarker inputMarker; + + /** + * @param jsonObj + * @param inputMarker + */ + public OutputData(Map<String, Object> jsonObj, InputMarker inputMarker) { + super(); + this.jsonObj = jsonObj; + this.inputMarker = inputMarker; + } + + @Override + public String toString() { + return "OutputData [jsonObj=" + jsonObj + ", inputMarker=" + + inputMarker + "]"; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java new file mode 100644 index 0000000..b6e36d6 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.log4j.Logger; + +public class OutputFile extends Output { + static Logger logger = Logger.getLogger(OutputFile.class); + + PrintWriter outWriter = null; + String filePath = null; + String codec; + + @Override + public void init() throws Exception { + super.init(); + + filePath = getStringValue("path"); + if (filePath == null || filePath.isEmpty()) { + logger.error("Filepath config property <path> is not set in config file."); + return; + } + codec = getStringValue("codec"); + if (codec == null || codec.trim().isEmpty()) { + codec = "json"; + } else { + if (codec.trim().equalsIgnoreCase("csv")) { + codec = "csv"; + } else if (codec.trim().equalsIgnoreCase("json")) { + codec = "csv"; + } else { + logger.error("Unsupported codec type. codec=" + codec + + ", will use json"); + codec = "json"; + } + } + logger.info("Out filePath=" + filePath + ", codec=" + codec); + File outFile = new File(filePath); + if (outFile.getParentFile() != null) { + File parentDir = outFile.getParentFile(); + if (!parentDir.isDirectory()) { + parentDir.mkdirs(); + } + } + + outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile, + true))); + + logger.info("init() is successfull. filePath=" + + outFile.getAbsolutePath()); + } + + @Override + public void close() { + logger.info("Closing file." + getShortDescription()); + if (outWriter != null) { + try { + outWriter.close(); + } catch (Throwable t) { + // Ignore this exception + } + } + isClosed = true; + } + + @Override + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) + throws Exception { + String outStr = null; + if (codec.equals("csv")) { + // Convert to CSV + CSVPrinter csvPrinter = new CSVPrinter(outWriter, CSVFormat.RFC4180); + //TODO: + } else { + outStr = LogFeederUtil.getGson().toJson(jsonObj); + } + if (outWriter != null && outStr != null) { + statMetric.count++; + + outWriter.println(outStr); + outWriter.flush(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.output.Output#write() + */ + @Override + synchronized public void write(String block, InputMarker inputMarker) throws Exception { + if (outWriter != null && block != null) { + statMetric.count++; + + outWriter.println(block); + outWriter.flush(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription() + */ + @Override + public String getShortDescription() { + return "output:destination=file,path=" + filePath; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java new file mode 100644 index 0000000..c594dd4 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedTransferQueue; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class OutputKafka extends Output { + static private Logger logger = Logger.getLogger(OutputKafka.class); + + String brokerList = null; + String topic = null; + boolean isAsync = true; + long messageCount = 0; + int batchSize = 5000; + int lingerMS = 1000; + + private KafkaProducer<String, String> producer = null; + BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>(); + + // Let's start with the assumption Kafka is down + boolean isKafkaBrokerUp = false; + + static final int FAILED_RETRY_INTERVAL = 30; + static final int CATCHUP_RETRY_INTERVAL = 5; + + @Override + public void init() throws Exception { + super.init(); + statMetric.metricsName = "output.kafka.write_logs"; + writeBytesMetric.metricsName = "output.kafka.write_bytes"; + + brokerList = getStringValue("broker_list"); + topic = getStringValue("topic"); + isAsync = getBooleanValue("is_async", true); + batchSize = getIntValue("batch_size", batchSize); + lingerMS = getIntValue("linger_ms", lingerMS); + + Map<String, Object> kafkaCustomProperties = new HashMap<String, Object>(); + // Get all kafka custom properties + for (String key : configs.keySet()) { + if (key.startsWith("kafka.")) { + Object value = configs.get(key); + if (value == null || value.toString().length() == 0) { + continue; + } + String kafkaKey = key.substring("kafka.".length()); + kafkaCustomProperties.put(kafkaKey, value); + } + } + + if (StringUtils.isEmpty(brokerList)) { + throw new Exception( + "For kafka output, bootstrap broker_list is needed"); + } + + if (StringUtils.isEmpty(topic)) { + throw new Exception("For kafka output, topic is needed"); + } + + Properties props = new Properties(); + // 0.9.0 + props.put("bootstrap.servers", brokerList); + props.put("client.id", "logfeeder_producer"); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + props.put("compression.type", "snappy"); + // props.put("retries", "3"); + props.put("batch.size", batchSize); + props.put("linger.ms", lingerMS); + + for (String kafkaKey : kafkaCustomProperties.keySet()) { + logger.info("Adding custom Kafka property. " + kafkaKey + "=" + + kafkaCustomProperties.get(kafkaKey)); + props.put(kafkaKey, kafkaCustomProperties.get(kafkaKey)); + } + + // props.put("metadata.broker.list", brokerList); + + producer = new KafkaProducer<String, String>(props); + Thread retryThread = new Thread("kafka-writer-retry,topic=" + topic) { + @Override + public void run() { + KafkaCallBack kafkaCallBack = null; + logger.info("Started thread to monitor failed messsages. " + + getShortDescription()); + while (true) { + try { + if (kafkaCallBack == null) { + kafkaCallBack = failedMessages.take(); + } + if (publishMessage(kafkaCallBack.message, + kafkaCallBack.inputMarker)) { + // logger.info("Sent message. count=" + // + kafkaCallBack.thisMessageNumber); + kafkaCallBack = null; + } else { + // Should wait for sometime + logger.error("Kafka is down. messageNumber=" + + kafkaCallBack.thisMessageNumber + + ". Going to sleep for " + + FAILED_RETRY_INTERVAL + " seconds"); + Thread.sleep(FAILED_RETRY_INTERVAL * 1000); + } + + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR"; + LogFeederUtil.logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Error sending message to Kafka during retry. message=" + + (kafkaCallBack == null ? null + : kafkaCallBack.message), t, + logger, Level.ERROR); + } + } + + } + }; + retryThread.setDaemon(true); + retryThread.start(); + } + + @Override + public void setDrain(boolean drain) { + super.setDrain(drain); + } + + /** + * Flush document buffer + */ + public void flush() { + logger.info("Flush called..."); + setDrain(true); + } + + @Override + public void close() { + logger.info("Closing Kafka client..."); + flush(); + if (producer != null) { + try { + producer.close(); + } catch (Throwable t) { + logger.error("Error closing Kafka topic. topic=" + topic); + } + } + logger.info("Closed Kafka client"); + super.close(); + } + + @Override + synchronized public void write(String block, InputMarker inputMarker) throws Exception { + while (!isDrain() && !inputMarker.input.isDrain()) { + try { + if (failedMessages.size() == 0) { + if (publishMessage(block, inputMarker)) { + break; + } + } + if (isDrain() || inputMarker.input.isDrain()) { + break; + } + if (!isKafkaBrokerUp) { + logger.error("Kafka is down. Going to sleep for " + + FAILED_RETRY_INTERVAL + " seconds"); + Thread.sleep(FAILED_RETRY_INTERVAL * 1000); + + } else { + logger.warn("Kafka is still catching up from previous failed messages. outstanding messages=" + + failedMessages.size() + + " Going to sleep for " + + CATCHUP_RETRY_INTERVAL + " seconds"); + Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000); + } + } catch (Throwable t) { + // ignore + break; + } + } + } + + private boolean publishMessage(String block, InputMarker inputMarker) { + if (isAsync && isKafkaBrokerUp) { // Send asynchronously + producer.send(new ProducerRecord<String, String>(topic, block), + new KafkaCallBack(this, block, inputMarker, ++messageCount)); + return true; + } else { // Send synchronously + try { + // Not using key. Let it round robin + RecordMetadata metadata = producer.send( + new ProducerRecord<String, String>(topic, block)).get(); + if (metadata != null) { + statMetric.count++; + writeBytesMetric.count += block.length(); + } + if (!isKafkaBrokerUp) { + logger.info("Started writing to kafka. " + + getShortDescription()); + isKafkaBrokerUp = true; + } + return true; + } catch (InterruptedException e) { + isKafkaBrokerUp = false; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_KAFKA_INTERRUPT"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "InterruptedException-Error sending message to Kafka", + e, logger, Level.ERROR); + } catch (ExecutionException e) { + isKafkaBrokerUp = false; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_KAFKA_EXECUTION"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "ExecutionException-Error sending message to Kafka", e, + logger, Level.ERROR); + } catch (Throwable t) { + isKafkaBrokerUp = false; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_KAFKA_WRITE_ERROR"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "GenericException-Error sending message to Kafka", t, + logger, Level.ERROR); + } + } + return false; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription() + */ + @Override + public String getShortDescription() { + return "output:destination=kafka,topic=" + topic; + } + +} + +class KafkaCallBack implements Callback { + static private Logger logger = Logger.getLogger(KafkaCallBack.class); + + long thisMessageNumber; + OutputKafka output = null; + String message; + InputMarker inputMarker; + + public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker, + long messageCount) { + this.thisMessageNumber = messageCount; + this.output = output; + this.inputMarker = inputMarker; + this.message = message; + } + + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (metadata != null) { + if (!output.isKafkaBrokerUp) { + logger.info("Started writing to kafka. " + + output.getShortDescription()); + output.isKafkaBrokerUp = true; + } + output.incrementStat(1); + output.writeBytesMetric.count += message.length(); + + // metadata.partition(); + // metadata.offset(); + } else { + output.isKafkaBrokerUp = false; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_KAFKA_ASYNC_ERROR"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Error sending message to Kafka. Async Callback", + exception, logger, Level.ERROR); + + output.failedMessages.add(this); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java new file mode 100644 index 0000000..7cd911d --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +import org.apache.solr.client.solrj.response.SolrPingResponse; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrInputDocument; + +public class OutputSolr extends Output { + static private Logger logger = Logger.getLogger(OutputSolr.class); + + private static final String ROUTER_FIELD = "_router_field_"; + + String solrUrl = null; + String zkHosts = null; + String collection = null; + String splitMode = "none"; + int splitInterval = 0; + int numberOfShards = 1; + boolean isComputeCurrentCollection = false; + + int maxBufferSize = 5000; + int maxIntervalMS = 3000; + int workers = 1; + + BlockingQueue<OutputData> outgoingBuffer = null; + List<SolrWorkerThread> writerThreadList = new ArrayList<SolrWorkerThread>(); + private static final int RETRY_INTERVAL = 30; + + int lastSlotByMin = -1; + + @Override + public void init() throws Exception { + super.init(); + statMetric.metricsName = "output.solr.write_logs"; + writeBytesMetric.metricsName = "output.solr.write_bytes"; + + solrUrl = getStringValue("url"); + zkHosts = getStringValue("zk_hosts"); + splitMode = getStringValue("splits_interval_mins", splitMode); + if (!splitMode.equalsIgnoreCase("none")) { + splitInterval = getIntValue("split_interval_mins", 30); + } + numberOfShards = getIntValue("number_of_shards", numberOfShards); + + maxBufferSize = getIntValue("flush_size", maxBufferSize); + if (maxBufferSize < 1) { + logger.warn("maxBufferSize is less than 1. Making it 1"); + } + maxIntervalMS = getIntValue("idle_flush_time_ms", maxIntervalMS); + workers = getIntValue("workers", workers); + + logger.info("Config: Number of workers=" + workers + ", splitMode=" + + splitMode + ", splitInterval=" + splitInterval + + ", numberOfShards=" + numberOfShards + ". " + + getShortDescription()); + + if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkHosts)) { + throw new Exception( + "For solr output, either url or zk_hosts property need to be set"); + } + + int bufferSize = maxBufferSize * (workers + 3); + logger.info("Creating blocking queue with bufferSize=" + bufferSize); + // outgoingBuffer = new ArrayBlockingQueue<OutputData>(bufferSize); + outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize); + + for (int count = 0; count < workers; count++) { + SolrClient solrClient = null; + CloudSolrClient solrClouldClient = null; + if (zkHosts != null) { + logger.info("Using zookeepr. zkHosts=" + zkHosts); + collection = getStringValue("collection"); + if (StringUtils.isEmpty(collection)) { + throw new Exception( + "For solr cloud property collection is mandatory"); + } + logger.info("Using collection=" + collection); + solrClouldClient = new CloudSolrClient(zkHosts); + solrClouldClient.setDefaultCollection(collection); + solrClient = solrClouldClient; + if (splitMode.equalsIgnoreCase("none")) { + isComputeCurrentCollection = false; + } else { + isComputeCurrentCollection = true; + } + } else { + String[] solrUrls = StringUtils.split(solrUrl, ","); + if (solrUrls.length == 1) { + logger.info("Using SolrURL=" + solrUrl); + solrClient = new HttpSolrClient(solrUrl); + } else { + logger.info("Using load balance solr client. solrUrls=" + + solrUrl); + logger.info("Initial URL for LB solr=" + solrUrls[0]); + @SuppressWarnings("resource") + LBHttpSolrClient lbSolrClient = new LBHttpSolrClient( + solrUrls[0]); + for (int i = 1; i < solrUrls.length; i++) { + logger.info("Adding URL for LB solr=" + solrUrls[i]); + lbSolrClient.addSolrServer(solrUrls[i]); + } + solrClient = lbSolrClient; + } + } + try { + logger.info("Pinging Solr server. zkHosts=" + zkHosts + + ", urls=" + solrUrl); + SolrPingResponse response = solrClient.ping(); + if (response.getStatus() == 0) { + logger.info("Ping to Solr server is successful for writer=" + + count); + } else { + logger.warn("Ping to Solr server failed. It would check again. writer=" + + count + + ", solrUrl=" + + solrUrl + + ", zkHosts=" + + zkHosts + + ", collection=" + + collection + + ", response=" + response); + } + } catch (Throwable t) { + logger.warn( + "Ping to Solr server failed. It would check again. writer=" + + count + ", solrUrl=" + solrUrl + ", zkHosts=" + + zkHosts + ", collection=" + collection, t); + } + + // Let's start the thread + SolrWorkerThread solrWriterThread = new SolrWorkerThread(solrClient); + solrWriterThread.setName(getNameForThread() + "," + collection + + ",writer=" + count); + solrWriterThread.setDaemon(true); + solrWriterThread.start(); + writerThreadList.add(solrWriterThread); + } + } + + @Override + public void setDrain(boolean drain) { + super.setDrain(drain); + } + + /** + * Flush document buffer + */ + public void flush() { + logger.info("Flush called..."); + setDrain(true); + + int wrapUpTimeSecs = 30; + // Give wrapUpTimeSecs seconds to wrap up + boolean isPending = false; + for (int i = 0; i < wrapUpTimeSecs; i++) { + for (SolrWorkerThread solrWorkerThread : writerThreadList) { + if (solrWorkerThread.isDone()) { + try { + solrWorkerThread.interrupt(); + } catch (Throwable t) { + // ignore + } + } else { + isPending = true; + } + } + if (isPending) { + try { + logger.info("Will give " + (wrapUpTimeSecs - i) + + " seconds to wrap up"); + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + } + isPending = false; + } + } + + @Override + public long getPendingCount() { + long totalCount = 0; + for (SolrWorkerThread solrWorkerThread : writerThreadList) { + totalCount += solrWorkerThread.localBuffer.size(); + } + return totalCount; + } + + @Override + public void close() { + logger.info("Closing Solr client..."); + flush(); + + logger.info("Closed Solr client"); + super.close(); + } + + @Override + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) + throws Exception { + try { + outgoingBuffer.put(new OutputData(jsonObj, inputMarker)); + } catch (InterruptedException e) { + // ignore + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription() + */ + @Override + public String getShortDescription() { + return "output:destination=solr,collection=" + collection; + } + + class SolrWorkerThread extends Thread { + /** + * + */ + SolrClient solrClient = null; + Collection<SolrInputDocument> localBuffer = new ArrayList<SolrInputDocument>(); + long localBufferBytesSize = 0; + Map<String, InputMarker> latestInputMarkerList = new HashMap<String, InputMarker>(); + + /** + * + */ + public SolrWorkerThread(SolrClient solrClient) { + this.solrClient = solrClient; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + logger.info("SolrWriter thread started"); + long lastDispatchTime = System.currentTimeMillis(); + + //long totalWaitTimeMS = 0; + while (true) { + long currTimeMS = System.currentTimeMillis(); + OutputData outputData = null; + try { + long nextDispatchDuration = maxIntervalMS + - (currTimeMS - lastDispatchTime); + outputData = outgoingBuffer.poll(); + if (outputData == null && !isDrain() + && nextDispatchDuration > 0) { + outputData = outgoingBuffer.poll(nextDispatchDuration, + TimeUnit.MILLISECONDS); +// long diffTimeMS = System.currentTimeMillis() +// - currTimeMS; + // logger.info("Waited for " + diffTimeMS + + // " ms, planned for " + // + nextDispatchDuration + " ms, localBuffer.size=" + // + localBuffer.size() + ", timedOut=" + // + (outputData == null ? "true" : "false")); + } + + if (isDrain() && outputData == null + && outgoingBuffer.size() == 0) { + break; + } + if (outputData != null) { + if (outputData.jsonObj.get("id") == null) { + outputData.jsonObj.put("id", UUID.randomUUID() + .toString()); + } + SolrInputDocument document = new SolrInputDocument(); + for (String name : outputData.jsonObj.keySet()) { + Object obj = outputData.jsonObj.get(name); + document.addField(name, obj); + try { + localBufferBytesSize += obj.toString().length(); + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_BYTE_COUNT_ERROR"; + LogFeederUtil.logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Error calculating byte size. object=" + + obj, t, logger, Level.ERROR); + + } + } + latestInputMarkerList.put( + outputData.inputMarker.base64FileKey, + outputData.inputMarker); + localBuffer.add(document); + } + + if (localBuffer.size() > 0 + && ((outputData == null && isDrain()) || (nextDispatchDuration <= 0 || localBuffer + .size() >= maxBufferSize))) { + try { + if (isComputeCurrentCollection) { + // Compute the current router value + + int weekDay = Calendar.getInstance().get( + Calendar.DAY_OF_WEEK); + int currHour = Calendar.getInstance().get( + Calendar.HOUR_OF_DAY); + int currMin = Calendar.getInstance().get( + Calendar.MINUTE); + + int minOfWeek = (weekDay - 1) * 24 * 60 + + currHour * 60 + currMin; + int slotByMin = minOfWeek / splitInterval + % numberOfShards; + + String shard = "shard" + slotByMin; + + if (lastSlotByMin != slotByMin) { + logger.info("Switching to shard " + shard + + ", output=" + + getShortDescription()); + lastSlotByMin = slotByMin; + } + + for (SolrInputDocument solrInputDocument : localBuffer) { + solrInputDocument.addField(ROUTER_FIELD, + shard); + } + } + +// long beginTime = System.currentTimeMillis(); + UpdateResponse response = solrClient + .add(localBuffer); +// long endTime = System.currentTimeMillis(); +// logger.info("Adding to Solr. Document count=" +// + localBuffer.size() + ". Took " +// + (endTime - beginTime) + " ms"); + + if (response.getStatus() != 0) { + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_SOLR_UPDATE_ERROR"; + LogFeederUtil + .logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Error writing to Solr. response=" + + response.toString() + + ", log=" + + (outputData == null ? null + : outputData + .toString()), + null, logger, Level.ERROR); + } + statMetric.count += localBuffer.size(); + writeBytesMetric.count += localBufferBytesSize; + for (InputMarker inputMarker : latestInputMarkerList + .values()) { + inputMarker.input.checkIn(inputMarker); + } + + resetLocalBuffer(); + lastDispatchTime = System.currentTimeMillis(); + } catch (IOException ioException) { + // Transient error, lets block till it is available + while (!isDrain()) { + try { + logger.warn("Solr is down. Going to sleep for " + + RETRY_INTERVAL + + " seconds. output=" + + getShortDescription()); + Thread.sleep(RETRY_INTERVAL * 1000); + } catch (Throwable t) { + // ignore + break; + } + if (isDrain()) { + break; + } + try { + SolrPingResponse pingResponse = solrClient + .ping(); + if (pingResponse.getStatus() == 0) { + logger.info("Solr seems to be up now. Resuming... output=" + + getShortDescription()); + break; + } + } catch (Throwable t) { + // Ignore + } + } + } catch (Throwable serverException) { + // Clear the buffer + resetLocalBuffer(); + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_SOLR_UPDATE_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Error sending log message to server. " + + (outputData == null ? null + : outputData.toString()), + serverException, logger, Level.ERROR); + } + } + } catch (InterruptedException e) { + // Handle thread exiting + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Caught exception in main loop. " + outputData, t, + logger, Level.ERROR); + } + } + + if (solrClient != null) { + try { + solrClient.close(); + } catch (IOException e) { + // Ignore + } + } + + resetLocalBuffer(); + logger.info("Exiting Solr writer thread. output=" + + getShortDescription()); + } + + public boolean isDone() { + return localBuffer.size() == 0; + } + + public void resetLocalBuffer() { + localBuffer.clear(); + localBufferBytesSize = 0; + latestInputMarkerList.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java new file mode 100644 index 0000000..4265dc6 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; +import org.apache.log4j.Logger; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.SolrRequest.METHOD; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; + +public class SolrUtil { + + private static Logger logger = Logger.getLogger(SolrUtil.class); + + private static SolrUtil instance = null; + SolrClient solrClient = null; + CloudSolrClient solrClouldClient = null; + + boolean isSolrCloud = true; + String solrDetail = ""; + String collectionName = null; + + private SolrUtil() throws Exception { + String url = LogFeederUtil.getStringProperty("logfeeder.solr.url"); + String zkHosts = LogFeederUtil.getStringProperty("logfeeder.solr.zkhosts"); + String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.history", "history"); + connectToSolr(url, zkHosts, collection); + } + + public static SolrUtil getInstance() { + if (instance == null) { + synchronized (SolrUtil.class) { + if (instance == null) { + try { + instance = new SolrUtil(); + } catch (Exception e) { + logger.error(e); + } + } + } + } + return instance; + } + + public SolrClient connectToSolr(String url, String zkHosts, + String collection) throws Exception { + this.collectionName = collection; + solrDetail = "zkHosts=" + zkHosts + ", collection=" + collection + + ", url=" + url; + + logger.info("connectToSolr() " + solrDetail); + if (collection == null || collection.isEmpty()) { + throw new Exception("For solr, collection name is mandatory. " + + solrDetail); + } + if (zkHosts != null && !zkHosts.isEmpty()) { + solrDetail = "zkHosts=" + zkHosts + ", collection=" + collection; + logger.info("Using zookeepr. " + solrDetail); + solrClouldClient = new CloudSolrClient(zkHosts); + solrClouldClient.setDefaultCollection(collection); + solrClient = solrClouldClient; + int waitDurationMS = 3 * 60 * 1000; + checkSolrStatus(waitDurationMS); + } else { + if (url == null || url.trim().isEmpty()) { + throw new Exception("Both zkHosts and URL are empty. zkHosts=" + + zkHosts + ", collection=" + collection + ", url=" + + url); + } + solrDetail = "collection=" + collection + ", url=" + url; + String collectionURL = url + "/" + collection; + logger.info("Connecting to solr : " + collectionURL); + solrClient = new HttpSolrClient(collectionURL); + + } + return solrClient; + } + + /** + * @param waitDurationMS + * @return + */ + public boolean checkSolrStatus(int waitDurationMS) { + boolean status = false; + try { + long beginTimeMS = System.currentTimeMillis(); + long waitIntervalMS = 2000; + int pingCount = 0; + while (true) { + pingCount++; + CollectionAdminResponse response = null; + try { + CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List(); + response = colListReq.process(solrClient); + } catch (Exception ex) { + logger.error("Con't connect to Solr. solrDetail=" + solrDetail, ex); + } + if (response != null && response.getStatus() == 0) { + logger.info("Solr getCollections() is success. solr=" + solrDetail); + status = true; + break; + } + if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) { + logger.error("Solr is not reachable even after " + + (System.currentTimeMillis() - beginTimeMS) + + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr=" + + solrDetail + ", response=" + response); + break; + } else { + logger.warn("Solr is not not reachable yet. getCollections() attempt count=" + pingCount + + ". Will sleep for " + waitIntervalMS + " ms and try again." + " solr=" + solrDetail + + ", response=" + response); + + } + Thread.sleep(waitIntervalMS); + } + } catch (Throwable t) { + logger.error("Seems Solr is not up. solrDetail=" + solrDetail); + } + return status; + } + + /** + * @param solrQuery + * @return + * @throws SolrServerException + * @throws IOException + * @throws SolrException + */ + public QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException { + if (solrClient != null) { + QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST); + return queryResponse; + } else { + return null; + } + } + + /** + * @return + */ + public HashMap<String, Object> getConfigDoc() { + HashMap<String, Object> configMap = new HashMap<String, Object>(); + SolrQuery solrQuery = new SolrQuery(); + solrQuery.setQuery("*:*"); + String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.NAME; + solrQuery.setFilterQueries(fq); + try { + QueryResponse response = SolrUtil.getInstance().process(solrQuery); + SolrDocumentList documentList = response.getResults(); + if (documentList != null && documentList.size() > 0) { + SolrDocument configDoc = documentList.get(0); + String configJson = LogFeederUtil.getGson().toJson(configDoc); + configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson); + } + } catch (SolrException | SolrServerException | IOException e) { + logger.error(e); + } + return configMap; + } + + /** + * @param solrInputDocument + * @throws SolrServerException + * @throws IOException + */ + public void addDoc(SolrInputDocument solrInputDocument) throws SolrServerException, IOException { + solrClient.add(solrInputDocument); + solrClient.commit(); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java new file mode 100644 index 0000000..f030040 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.view; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class VLogfeederFilter { + + private String label; + private List<String> hosts; + private List<String> defaultLevels; + private List<String> overrideLevels; + private String expiryTime; + + public VLogfeederFilter() { + hosts = new ArrayList<String>(); + defaultLevels = new ArrayList<String>(); + overrideLevels = new ArrayList<String>(); + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public List<String> getHosts() { + return hosts; + } + + public void setHosts(List<String> hosts) { + this.hosts = hosts; + } + + public List<String> getDefaultLevels() { + return defaultLevels; + } + + public void setDefaultLevels(List<String> defaultLevels) { + this.defaultLevels = defaultLevels; + } + + public List<String> getOverrideLevels() { + return overrideLevels; + } + + public void setOverrideLevels(List<String> overrideLevels) { + this.overrideLevels = overrideLevels; + } + + public String getExpiryTime() { + return expiryTime; + } + + public void setExpiryTime(String expiryTime) { + this.expiryTime = expiryTime; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java new file mode 100644 index 0000000..4ddef3f --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.view; + +import java.util.HashMap; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class VLogfeederFilterWrapper { + + private HashMap<String, VLogfeederFilter> filter; + private String id; + + public HashMap<String, VLogfeederFilter> getFilter() { + return filter; + } + + public void setFilter(HashMap<String, VLogfeederFilter> filter) { + this.filter = filter; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java new file mode 100644 index 0000000..956af16 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +public abstract class AbstractTimelineMetricsSink { + public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix."; + public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize"; + public static final String METRICS_SEND_INTERVAL = "sendInterval"; + public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout"; + public static final String COLLECTOR_HOST_PROPERTY = "collector"; + public static final String COLLECTOR_PORT_PROPERTY = "port"; + public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; + + protected final Log LOG; + + protected static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + } + + public AbstractTimelineMetricsSink() { + LOG = LogFactory.getLog(this.getClass()); + } + + protected void emitMetrics(TimelineMetrics metrics) { + String connectUrl = getCollectorUri(); + int timeout = getTimeoutSeconds() * 1000; + try { + String jsonData = mapper.writeValueAsString(metrics); + LOG.info("Posting JSON=" + jsonData); + + HttpURLConnection connection = + (HttpURLConnection) new URL(connectUrl).openConnection(); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setDoOutput(true); + + if (jsonData != null) { + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + + "statusCode = " + statusCode); + } else { + LOG.debug("Metrics posted to Collector " + connectUrl); + } + } catch (IOException e) { + throw new UnableToConnectException(e).setConnectUrl(connectUrl); + } + } + + abstract protected String getCollectorUri(); + + abstract protected int getTimeoutSeconds(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java new file mode 100644 index 0000000..31044cc --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +/** + * Is used to determine metrics aggregate table. + * + * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric + */ +public enum Precision { + SECONDS, + MINUTES, + HOURS, + DAYS; + + public static class PrecisionFormatException extends IllegalArgumentException { + public PrecisionFormatException(String message, Throwable cause) { + super(message, cause); + } + } + + public static Precision getPrecision(String precision) throws PrecisionFormatException { + if (precision == null ) { + return null; + } + try { + return Precision.valueOf(precision.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new PrecisionFormatException("precision should be seconds, " + + "minutes, hours or days", e); + } + } + + public static Precision getPrecision(long startTime, long endTime) { + long HOUR = 3600000; // 1 hour + long DAY = 86400000; // 1 day + long timeRange = endTime - startTime; + if (timeRange > 30 * DAY) { + return Precision.DAYS; + } else if (timeRange > 1 * DAY) { + return Precision.HOURS; + } else if (timeRange > 2 * HOUR) { + return Precision.MINUTES; + } else { + return Precision.SECONDS; + } + } + + public static Precision getHigherPrecision(Precision precision) { + + if (precision == null) + return null; + + if (precision.equals(Precision.SECONDS)) { + return Precision.MINUTES; + } else if (precision.equals(Precision.MINUTES)) { + return Precision.HOURS; + } else if (precision.equals(Precision.HOURS)) { + return Precision.DAYS; + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java new file mode 100644 index 0000000..962a071 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +public class PrecisionLimitExceededException extends IllegalArgumentException { + + private static final long serialVersionUID = 1L; + + public PrecisionLimitExceededException(String message, Throwable cause) { + super(message, cause); + } + + public PrecisionLimitExceededException(String message) { + super(message); + } + + public PrecisionLimitExceededException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java new file mode 100644 index 0000000..8ecca54 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +/** + * This class prevents creating a TreeMap for every instantiation of a metric + * read from the store. The methods are meant to provide interoperability + * with @TimelineMetric + */ +public class SingleValuedTimelineMetric { + private Long timestamp; + private Double value; + private String metricName; + private String appId; + private String instanceId; + private String hostName; + private Long startTime; + private String type; + + public void setSingleTimeseriesValue(Long timestamp, Double value) { + this.timestamp = timestamp; + this.value = value; + } + + public SingleValuedTimelineMetric(String metricName, String appId, + String instanceId, String hostName, + long timestamp, long startTime, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.hostName = hostName; + this.timestamp = timestamp; + this.startTime = startTime; + this.type = type; + } + + public Long getTimestamp() { + return timestamp; + } + + public long getStartTime() { + return startTime; + } + + public String getType() { + return type; + } + + public Double getValue() { + return value; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + public String getInstanceId() { + return instanceId; + } + + public String getHostName() { + return hostName; + } + + public boolean equalsExceptTime(TimelineMetric metric) { + if (!metricName.equals(metric.getMetricName())) return false; + if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null) + return false; + if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false; + + return true; + } + + public TimelineMetric getTimelineMetric() { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(this.metricName); + metric.setAppId(this.appId); + metric.setHostName(this.hostName); + metric.setType(this.type); + metric.setInstanceId(this.instanceId); + metric.setStartTime(this.startTime); + metric.setTimestamp(this.timestamp); + metric.getMetricValues().put(timestamp, value); + return metric; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java new file mode 100644 index 0000000..0e74f2d --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import java.util.Map; +import java.util.TreeMap; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.codehaus.jackson.map.annotate.JsonDeserialize; + +@XmlRootElement(name = "metric") +@XmlAccessorType(XmlAccessType.NONE) +public class TimelineMetric implements Comparable<TimelineMetric> { + + private String metricName; + private String appId; + private String instanceId; + private String hostName; + private long timestamp; + private long startTime; + private String type; + private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + // default + public TimelineMetric() { + + } + + // copy constructor + public TimelineMetric(TimelineMetric metric) { + setMetricName(metric.getMetricName()); + setType(metric.getType()); + setTimestamp(metric.getTimestamp()); + setAppId(metric.getAppId()); + setInstanceId(metric.getInstanceId()); + setHostName(metric.getHostName()); + setStartTime(metric.getStartTime()); + setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues())); + } + + @XmlElement(name = "metricname") + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + @XmlElement(name = "appid") + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + @XmlElement(name = "instanceid") + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + @XmlElement(name = "hostname") + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + @XmlElement(name = "timestamp") + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @XmlElement(name = "starttime") + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @XmlElement(name = "type") + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @XmlElement(name = "metrics") + public TreeMap<Long, Double> getMetricValues() { + return metricValues; + } + + public void setMetricValues(TreeMap<Long, Double> metricValues) { + this.metricValues = metricValues; + } + + public void addMetricValues(Map<Long, Double> metricValues) { + this.metricValues.putAll(metricValues); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetric metric = (TimelineMetric) o; + + if (!metricName.equals(metric.metricName)) return false; + if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) + return false; + if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + if (timestamp != metric.timestamp) return false; + if (startTime != metric.startTime) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) + return false; + if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (hostName != null ? hostName.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public int compareTo(TimelineMetric other) { + if (timestamp > other.timestamp) { + return -1; + } else if (timestamp < other.timestamp) { + return 1; + } else { + return metricName.compareTo(other.metricName); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java new file mode 100644 index 0000000..11ca665 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * The class that hosts a list of timeline entities. + */ +@XmlRootElement(name = "metrics") +@XmlAccessorType(XmlAccessType.NONE) +public class TimelineMetrics { + + private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>(); + + public TimelineMetrics() {} + + @XmlElement(name = "metrics") + public List<TimelineMetric> getMetrics() { + return allMetrics; + } + + public void setMetrics(List<TimelineMetric> allMetrics) { + this.allMetrics = allMetrics; + } + + private boolean isEqualTimelineMetrics(TimelineMetric metric1, + TimelineMetric metric2) { + + boolean isEqual = true; + + if (!metric1.getMetricName().equals(metric2.getMetricName())) { + return false; + } + + if (metric1.getHostName() != null) { + isEqual = metric1.getHostName().equals(metric2.getHostName()); + } + + if (metric1.getAppId() != null) { + isEqual = metric1.getAppId().equals(metric2.getAppId()); + } + + return isEqual; + } + + /** + * Merge with existing TimelineMetric if everything except startTime is + * the same. + * @param metric {@link TimelineMetric} + */ + public void addOrMergeTimelineMetric(TimelineMetric metric) { + TimelineMetric metricToMerge = null; + + if (!allMetrics.isEmpty()) { + for (TimelineMetric timelineMetric : allMetrics) { + if (timelineMetric.equalsExceptTime(metric)) { + metricToMerge = timelineMetric; + break; + } + } + } + + if (metricToMerge != null) { + metricToMerge.addMetricValues(metric.getMetricValues()); + if (metricToMerge.getTimestamp() > metric.getTimestamp()) { + metricToMerge.setTimestamp(metric.getTimestamp()); + } + if (metricToMerge.getStartTime() > metric.getStartTime()) { + metricToMerge.setStartTime(metric.getStartTime()); + } + } else { + allMetrics.add(metric); + } + } + + // Optimization that addresses too many TreeMaps from getting created. + public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) { + TimelineMetric metricToMerge = null; + + if (!allMetrics.isEmpty()) { + for (TimelineMetric timelineMetric : allMetrics) { + if (metric.equalsExceptTime(timelineMetric)) { + metricToMerge = timelineMetric; + break; + } + } + } + + if (metricToMerge != null) { + metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue()); + if (metricToMerge.getTimestamp() > metric.getTimestamp()) { + metricToMerge.setTimestamp(metric.getTimestamp()); + } + if (metricToMerge.getStartTime() > metric.getStartTime()) { + metricToMerge.setStartTime(metric.getStartTime()); + } + } else { + allMetrics.add(metric.getTimelineMetric()); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java new file mode 100644 index 0000000..797924f --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +public class UnableToConnectException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private String connectUrl; + + public UnableToConnectException(String message, Throwable cause) { + super(message, cause); + } + + public UnableToConnectException(String message) { + super(message); + } + + public UnableToConnectException(Throwable cause) { + super(cause); + } + + public UnableToConnectException setConnectUrl(String connectUrl) { + this.connectUrl = connectUrl; + return this; + } + + public String getConnectUrl() { + return connectUrl; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java new file mode 100644 index 0000000..a331c77 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.cache; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TreeMap; + +public class TimelineMetricsCache { + + private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder(); + private static final Log LOG = LogFactory.getLog(TimelineMetric.class); + public static final int MAX_RECS_PER_NAME_DEFAULT = 10000; + public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min + private final int maxRecsPerName; + private final int maxEvictionTimeInMillis; + private final Map<String, Double> counterMetricLastValue = new HashMap<String, Double>(); + + public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) { + this.maxRecsPerName = maxRecsPerName; + this.maxEvictionTimeInMillis = maxEvictionTimeInMillis; + } + + class TimelineMetricWrapper { + private long timeDiff = -1; + private long oldestTimestamp = -1; + private TimelineMetric timelineMetric; + + TimelineMetricWrapper(TimelineMetric timelineMetric) { + this.timelineMetric = timelineMetric; + this.oldestTimestamp = timelineMetric.getStartTime(); + } + + private void updateTimeDiff(long timestamp) { + if (oldestTimestamp != -1 && timestamp > oldestTimestamp) { + timeDiff = timestamp - oldestTimestamp; + } else { + oldestTimestamp = timestamp; + } + } + + public void putMetric(TimelineMetric metric) { + this.timelineMetric.addMetricValues(metric.getMetricValues()); + updateTimeDiff(metric.getStartTime()); + } + + public long getTimeDiff() { + return timeDiff; + } + + public TimelineMetric getTimelineMetric() { + return timelineMetric; + } + } + + // TODO: Change to ConcurentHashMap with weighted eviction + class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {// + private static final long serialVersionUID = 1L; + private boolean gotOverflow = false; + // To avoid duplication at the end of the buffer and beginning of the next + // segment of values + private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>(); + + @Override + protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) { + boolean overflow = size() > maxRecsPerName; + if (overflow && !gotOverflow) { + LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest); + gotOverflow = true; + } + return overflow; + } + + public TimelineMetric evict(String metricName) { + TimelineMetricWrapper metricWrapper = this.get(metricName); + + if (metricWrapper == null + || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) { + return null; + } + + TimelineMetric timelineMetric = metricWrapper.getTimelineMetric(); + this.remove(metricName); + + return timelineMetric; + } + + public void put(String metricName, TimelineMetric timelineMetric) { + if (isDuplicate(timelineMetric)) { + return; + } + TimelineMetricWrapper metric = this.get(metricName); + if (metric == null) { + this.put(metricName, new TimelineMetricWrapper(timelineMetric)); + } else { + metric.putMetric(timelineMetric); + } + // Buffer last ts value + endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime()); + } + + /** + * Test whether last buffered timestamp is same as the newly received. + * @param timelineMetric @TimelineMetric + * @return true/false + */ + private boolean isDuplicate(TimelineMetric timelineMetric) { + return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName()) + && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime()); + } + } + + public TimelineMetric getTimelineMetric(String metricName) { + if (timelineMetricCache.containsKey(metricName)) { + return timelineMetricCache.evict(metricName); + } + + return null; + } + + /** + * Getter method to help testing eviction + * @return @int + */ + public int getMaxEvictionTimeInMillis() { + return maxEvictionTimeInMillis; + } + + public void putTimelineMetric(TimelineMetric timelineMetric) { + timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric); + } + + private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) { + String metricName = timelineMetric.getMetricName(); + double firstValue = timelineMetric.getMetricValues().size() > 0 + ? timelineMetric.getMetricValues().entrySet().iterator().next().getValue() : 0; + Double value = counterMetricLastValue.get(metricName); + double previousValue = value != null ? value : firstValue; + Map<Long, Double> metricValues = timelineMetric.getMetricValues(); + TreeMap<Long, Double> newMetricValues = new TreeMap<Long, Double>(); + for (Map.Entry<Long, Double> entry : metricValues.entrySet()) { + newMetricValues.put(entry.getKey(), entry.getValue() - previousValue); + previousValue = entry.getValue(); + } + timelineMetric.setMetricValues(newMetricValues); + counterMetricLastValue.put(metricName, previousValue); + } + + public void putTimelineMetric(TimelineMetric timelineMetric, boolean isCounter) { + if (isCounter) { + transformMetricValuesToDerivative(timelineMetric); + } + putTimelineMetric(timelineMetric); + } +}
