http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java index 18a5a54..e1a0bb9 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java @@ -30,26 +30,27 @@ import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; public class OutputFile extends Output { - static Logger logger = Logger.getLogger(OutputFile.class); + private static final Logger LOG = Logger.getLogger(OutputFile.class); - PrintWriter outWriter = null; - String filePath = null; - String codec; + private PrintWriter outWriter; + private String filePath = null; + private String codec; @Override public void init() throws Exception { super.init(); filePath = getStringValue("path"); - if (filePath == null || filePath.isEmpty()) { - logger.error("Filepath config property <path> is not set in config file."); + if (StringUtils.isEmpty(filePath)) { + LOG.error("Filepath config property <path> is not set in config file."); return; } codec = getStringValue("codec"); - if (codec == null || codec.trim().isEmpty()) { + if (StringUtils.isBlank(codec)) { codec = "json"; } else { if (codec.trim().equalsIgnoreCase("csv")) { @@ -57,12 +58,11 @@ public class OutputFile extends Output { } else if (codec.trim().equalsIgnoreCase("json")) { codec = "csv"; } else { - logger.error("Unsupported codec type. codec=" + codec - + ", will use json"); + LOG.error("Unsupported codec type. codec=" + codec + ", will use json"); codec = "json"; } } - logger.info("Out filePath=" + filePath + ", codec=" + codec); + LOG.info("Out filePath=" + filePath + ", codec=" + codec); File outFile = new File(filePath); if (outFile.getParentFile() != null) { File parentDir = outFile.getParentFile(); @@ -71,16 +71,14 @@ public class OutputFile extends Output { } } - outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile, - true))); + outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile, true))); - logger.info("init() is successfull. filePath=" - + outFile.getAbsolutePath()); + LOG.info("init() is successfull. filePath=" + outFile.getAbsolutePath()); } @Override public void close() { - logger.info("Closing file." + getShortDescription()); + LOG.info("Closing file." + getShortDescription()); if (outWriter != null) { try { outWriter.close(); @@ -92,8 +90,7 @@ public class OutputFile extends Output { } @Override - public void write(Map<String, Object> jsonObj, InputMarker inputMarker) - throws Exception { + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception { String outStr = null; CSVPrinter csvPrinter = null; try { @@ -104,7 +101,7 @@ public class OutputFile extends Output { outStr = LogFeederUtil.getGson().toJson(jsonObj); } if (outWriter != null && outStr != null) { - statMetric.count++; + statMetric.value++; outWriter.println(outStr); outWriter.flush(); @@ -122,7 +119,7 @@ public class OutputFile extends Output { @Override synchronized public void write(String block, InputMarker inputMarker) throws Exception { if (outWriter != null && block != null) { - statMetric.count++; + statMetric.value++; outWriter.println(block); outWriter.flush(); @@ -135,10 +132,7 @@ public class OutputFile extends Output { } @Override - public void copyFile(File inputFile, InputMarker inputMarker) - throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "copyFile method is not yet supported for output=file"); + public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException { + throw new UnsupportedOperationException("copyFile method is not yet supported for output=file"); } - }
http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index a360215..8f4b0b1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -43,7 +43,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; * The events are spooled on the local file system and uploaded in batches asynchronously. */ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition { - private final static Logger logger = Logger.getLogger(OutputHDFSFile.class); + private static final Logger LOG = Logger.getLogger(OutputHDFSFile.class); + private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>(); @@ -72,23 +73,20 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L; filenamePrefix = getStringValue("file_name_prefix", filenamePrefix); if (StringUtils.isEmpty(hdfsOutDir)) { - logger - .error("HDFS config property <hdfs_out_dir> is not set in config file."); + LOG.error("HDFS config property <hdfs_out_dir> is not set in config file."); return; } if (StringUtils.isEmpty(hdfsHost)) { - logger - .error("HDFS config property <hdfs_host> is not set in config file."); + LOG.error("HDFS config property <hdfs_host> is not set in config file."); return; } if (StringUtils.isEmpty(hdfsPort)) { - logger - .error("HDFS config property <hdfs_port> is not set in config file."); + LOG.error("HDFS config property <hdfs_port> is not set in config file."); return; } HashMap<String, String> contextParam = buildContextParam(); hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam); - logger.info("hdfs Output dir=" + hdfsOutDir); + LOG.info("hdfs Output dir=" + hdfsOutDir); String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/"; logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this); this.startHDFSCopyThread(); @@ -96,18 +94,17 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC @Override public void close() { - logger.info("Closing file." + getShortDescription()); + LOG.info("Closing file." + getShortDescription()); logSpooler.rollover(); this.stopHDFSCopyThread(); isClosed = true; } @Override - synchronized public void write(String block, InputMarker inputMarker) - throws Exception { + public synchronized void write(String block, InputMarker inputMarker) throws Exception { if (block != null) { logSpooler.add(block); - statMetric.count++; + statMetric.value++; } } @@ -127,24 +124,19 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC Iterator<File> localFileIterator = localReadyFiles.iterator(); while (localFileIterator.hasNext()) { File localFile = localFileIterator.next(); - fileSystem = LogfeederHDFSUtil.INSTANCE.buildFileSystem(hdfsHost, - hdfsPort); + fileSystem = LogfeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort); if (fileSystem != null && localFile.exists()) { String destFilePath = hdfsOutDir + "/" + localFile.getName(); String localPath = localFile.getAbsolutePath(); boolean overWrite = true; boolean delSrc = true; - boolean isCopied = LogfeederHDFSUtil.INSTANCE.copyFromLocal( - localFile.getAbsolutePath(), destFilePath, fileSystem, + boolean isCopied = LogfeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem, overWrite, delSrc); if (isCopied) { - logger.debug("File copy to hdfs hdfspath :" + destFilePath - + " and deleted local file :" + localPath); + LOG.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath); } else { - // TODO Need to write retry logic, in next release we can - // handle it - logger.error("Hdfs file copy failed for hdfspath :" - + destFilePath + " and localpath :" + localPath); + // TODO Need to write retry logic, in next release we can handle it + LOG.error("Hdfs file copy failed for hdfspath :" + destFilePath + " and localpath :" + localPath); } } localFileIterator.remove(); @@ -157,14 +149,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC } } } catch (InterruptedException e) { - logger.error(e.getLocalizedMessage(),e); + LOG.error(e.getLocalizedMessage(),e); } } } catch (Exception e) { - logger - .error( - "Exception in hdfsCopyThread errorMsg:" - + e.getLocalizedMessage(), e); + LOG.error("Exception in hdfsCopyThread errorMsg:" + e.getLocalizedMessage(), e); } } }; @@ -174,24 +163,23 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC private void stopHDFSCopyThread() { if (hdfsCopyThread != null) { - logger.info("waiting till copy all local files to hdfs......."); + LOG.info("waiting till copy all local files to hdfs......."); while (!localReadyFiles.isEmpty()) { try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.error(e.getLocalizedMessage(), e); + LOG.error(e.getLocalizedMessage(), e); } - logger.debug("still waiting to copy all local files to hdfs......."); + LOG.debug("still waiting to copy all local files to hdfs......."); } - logger.info("calling interrupt method for hdfsCopyThread to stop it."); + LOG.info("calling interrupt method for hdfsCopyThread to stop it."); try { hdfsCopyThread.interrupt(); } catch (SecurityException exception) { - logger.error(" Current thread : '" + Thread.currentThread().getName() - + "' does not have permission to interrupt the Thread: '" - + hdfsCopyThread.getName() + "'"); + LOG.error(" Current thread : '" + Thread.currentThread().getName() + + "' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'"); } - LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem); + LogfeederHDFSUtil.closeFileSystem(fileSystem); } } @@ -208,15 +196,13 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC readyMonitor.notifyAll(); } } catch (Exception e) { - logger.error(e.getLocalizedMessage(),e); + LOG.error(e.getLocalizedMessage(),e); } } @Override - public void copyFile(File inputFile, InputMarker inputMarker) - throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "copyFile method is not yet supported for output=hdfs"); + public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException { + throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs"); } /** @@ -242,8 +228,8 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime(); boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis; if (shouldRollover) { - logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() + - " has crossed threshold (msecs) " + rolloverThresholdTimeMillis); + LOG.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() + + " has crossed threshold (msecs) " + rolloverThresholdTimeMillis); } return shouldRollover; } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java index 2595d87..52fc6f8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java @@ -56,6 +56,16 @@ public class OutputKafka extends Output { private boolean isKafkaBrokerUp = false; @Override + protected String getStatMetricName() { + return "output.kafka.write_logs"; + } + + @Override + protected String getWriteBytesMetricName() { + return "output.kafka.write_bytes"; + } + + @Override public void init() throws Exception { super.init(); Properties props = initProperties(); @@ -65,9 +75,6 @@ public class OutputKafka extends Output { } private Properties initProperties() throws Exception { - statMetric.metricsName = "output.kafka.write_logs"; - writeBytesMetric.metricsName = "output.kafka.write_bytes"; - String brokerList = getStringValue("broker_list"); if (StringUtils.isEmpty(brokerList)) { throw new Exception("For kafka output, bootstrap broker_list is needed"); @@ -124,17 +131,15 @@ public class OutputKafka extends Output { if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) { kafkaCallBack = null; } else { - LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for " - + FAILED_RETRY_INTERVAL + " seconds"); + LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for " + + FAILED_RETRY_INTERVAL + " seconds"); Thread.sleep(FAILED_RETRY_INTERVAL * 1000); } } catch (Throwable t) { String logMessageKey = this.getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, - "Error sending message to Kafka during retry. message=" - + (kafkaCallBack == null ? null : kafkaCallBack.message), - t, LOG, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending message to Kafka during retry. message=" + + (kafkaCallBack == null ? null : kafkaCallBack.message), t, LOG, Level.ERROR); } } @@ -160,8 +165,8 @@ public class OutputKafka extends Output { LOG.error("Kafka is down. Going to sleep for " + FAILED_RETRY_INTERVAL + " seconds"); Thread.sleep(FAILED_RETRY_INTERVAL * 1000); } else { - LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages=" - + failedMessages.size() + " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds"); + LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages=" + failedMessages.size() + + " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds"); Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000); } } catch (Throwable t) { @@ -198,16 +203,15 @@ public class OutputKafka extends Output { private boolean publishMessage(String block, InputMarker inputMarker) { if (isAsync && isKafkaBrokerUp) { // Send asynchronously - producer.send(new ProducerRecord<String, String>(topic, block), - new KafkaCallBack(this, block, inputMarker, ++messageCount)); + producer.send(new ProducerRecord<String, String>(topic, block), new KafkaCallBack(this, block, inputMarker, ++messageCount)); return true; } else { // Send synchronously try { // Not using key. Let it round robin RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, block)).get(); if (metadata != null) { - statMetric.count++; - writeBytesMetric.count += block.length(); + statMetric.value++; + writeBytesMetric.value += block.length(); } if (!isKafkaBrokerUp) { LOG.info("Started writing to kafka. " + getShortDescription()); @@ -217,18 +221,18 @@ public class OutputKafka extends Output { } catch (InterruptedException e) { isKafkaBrokerUp = false; String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_INTERRUPT"; - LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e, - LOG, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e, LOG, + Level.ERROR); } catch (ExecutionException e) { isKafkaBrokerUp = false; String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_EXECUTION"; - LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e, - LOG, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e, LOG, + Level.ERROR); } catch (Throwable t) { isKafkaBrokerUp = false; String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_WRITE_ERROR"; - LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t, - LOG, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t, LOG, + Level.ERROR); } } return false; @@ -260,12 +264,12 @@ public class OutputKafka extends Output { output.isKafkaBrokerUp = true; } output.incrementStat(1); - output.writeBytesMetric.count += message.length(); + output.writeBytesMetric.value += message.length(); } else { output.isKafkaBrokerUp = false; String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR"; - LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", - exception, LOG, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", exception, LOG, + Level.ERROR); output.failedMessages.add(this); } @@ -273,9 +277,7 @@ public class OutputKafka extends Output { } @Override - public void copyFile(File inputFile, InputMarker inputMarker) - throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "copyFile method is not yet supported for output=kafka"); + public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException { + throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka"); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java new file mode 100644 index 0000000..2c81c19 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.logconfig.FilterLogData; +import org.apache.ambari.logfeeder.metrics.MetricData; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.MurmurHash; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class OutputManager { + private static final Logger LOG = Logger.getLogger(OutputManager.class); + + private static final int HASH_SEED = 31174077; + private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1 + + private List<Output> outputs = new ArrayList<Output>(); + + private boolean addMessageMD5 = true; + + private static long docCounter = 0; + private MetricData messageTruncateMetric = new MetricData(null, false); + + public List<Output> getOutputs() { + return outputs; + } + + public void add(Output output) { + this.outputs.add(output); + } + + public void retainUsedOutputs(Collection<Output> usedOutputs) { + outputs.retainAll(usedOutputs); + } + + public void init() throws Exception { + for (Output output : outputs) { + output.init(); + } + } + + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { + Input input = inputMarker.input; + + // Update the block with the context fields + for (Map.Entry<String, String> entry : input.getContextFields().entrySet()) { + if (jsonObj.get(entry.getKey()) == null) { + jsonObj.put(entry.getKey(), entry.getValue()); + } + } + + // TODO: Ideally most of the overrides should be configurable + + if (jsonObj.get("type") == null) { + jsonObj.put("type", input.getStringValue("type")); + } + if (jsonObj.get("path") == null && input.getFilePath() != null) { + jsonObj.put("path", input.getFilePath()); + } + if (jsonObj.get("path") == null && input.getStringValue("path") != null) { + jsonObj.put("path", input.getStringValue("path")); + } + if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) { + jsonObj.put("host", LogFeederUtil.hostName); + } + if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) { + jsonObj.put("ip", LogFeederUtil.ipAddress); + } + if (jsonObj.get("level") == null) { + jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN); + } + + if (input.isUseEventMD5() || input.isGenEventMD5()) { + String prefix = ""; + Object logtimeObj = jsonObj.get("logtime"); + if (logtimeObj != null) { + if (logtimeObj instanceof Date) { + prefix = "" + ((Date) logtimeObj).getTime(); + } else { + prefix = logtimeObj.toString(); + } + } + + Long eventMD5 = MurmurHash.hash64A(LogFeederUtil.getGson().toJson(jsonObj).getBytes(), HASH_SEED); + if (input.isGenEventMD5()) { + jsonObj.put("event_md5", prefix + eventMD5.toString()); + } + if (input.isUseEventMD5()) { + jsonObj.put("id", prefix + eventMD5.toString()); + } + } + + jsonObj.put("seq_num", new Long(docCounter++)); + if (jsonObj.get("id") == null) { + jsonObj.put("id", UUID.randomUUID().toString()); + } + if (jsonObj.get("event_count") == null) { + jsonObj.put("event_count", new Integer(1)); + } + if (inputMarker.lineNumber > 0) { + jsonObj.put("logfile_line_number", new Integer(inputMarker.lineNumber)); + } + if (jsonObj.containsKey("log_message")) { + // TODO: Let's check size only for log_message for now + String logMessage = (String) jsonObj.get("log_message"); + logMessage = truncateLongLogMessage(jsonObj, input, logMessage); + if (addMessageMD5) { + jsonObj.put("message_md5", "" + MurmurHash.hash64A(logMessage.getBytes(), 31174077)); + } + } + + if (FilterLogData.INSTANCE.isAllowed(jsonObj)) { + for (Output output : input.getOutputList()) { + try { + output.write(jsonObj, inputMarker); + } catch (Exception e) { + LOG.error("Error writing. to " + output.getShortDescription(), e); + } + } + } + } + + @SuppressWarnings("unchecked") + private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) { + if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { + messageTruncateMetric.value++; + String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length + + ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" + + StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN); + logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE); + jsonObj.put("log_message", logMessage); + List<String> tagsList = (List<String>) jsonObj.get("tags"); + if (tagsList == null) { + tagsList = new ArrayList<String>(); + jsonObj.put("tags", tagsList); + } + tagsList.add("error_message_truncated"); + } + return logMessage; + } + + public void write(String jsonBlock, InputMarker inputMarker) { + if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) { + for (Output output : inputMarker.input.getOutputList()) { + try { + output.write(jsonBlock, inputMarker); + } catch (Exception e) { + LOG.error("Error writing. to " + output.getShortDescription(), e); + } + } + } + } + + public void copyFile(File inputFile, InputMarker inputMarker) { + Input input = inputMarker.input; + for (Output output : input.getOutputList()) { + try { + output.copyFile(inputFile, inputMarker); + }catch (Exception e) { + LOG.error("Error coyping file . to " + output.getShortDescription(), e); + } + } + } + + public void logStats() { + for (Output output : outputs) { + output.logStat(); + } + LogFeederUtil.logStatForMetric(messageTruncateMetric, "Stat: Messages Truncated", ""); + } + + public void addMetricsContainers(List<MetricData> metricsList) { + metricsList.add(messageTruncateMetric); + for (Output output : outputs) { + output.addMetricsContainers(metricsList); + } + } + + public void close() { + LOG.info("Close called for outputs ..."); + for (Output output : outputs) { + try { + output.setDrain(true); + output.close(); + } catch (Exception e) { + // Ignore + } + } + + // Need to get this value from property + int iterations = 30; + int waitTimeMS = 1000; + for (int i = 0; i < iterations; i++) { + boolean allClosed = true; + for (Output output : outputs) { + if (!output.isClosed()) { + try { + allClosed = false; + LOG.warn("Waiting for output to close. " + output.getShortDescription() + ", " + (iterations - i) + " more seconds"); + Thread.sleep(waitTimeMS); + } catch (Throwable t) { + // Ignore + } + } + } + if (allClosed) { + LOG.info("All outputs are closed. Iterations=" + i); + return; + } + } + + LOG.warn("Some outpus were not closed after " + iterations + " iterations"); + for (Output output : outputs) { + if (!output.isClosed()) { + LOG.warn("Output not closed. Will ignore it." + output.getShortDescription() + ", pendingCound=" + output.getPendingCount()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java deleted file mode 100644 index 0a6b7fa..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.output; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.ambari.logfeeder.input.Input; -import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; -import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData; -import org.apache.ambari.logfeeder.metrics.MetricCount; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -public class OutputMgr { - private static final Logger logger = Logger.getLogger(OutputMgr.class); - - private Collection<Output> outputList = new ArrayList<Output>(); - - private boolean addMessageMD5 = true; - - private int MAX_OUTPUT_SIZE = 32765; // 32766-1 - private static long doc_counter = 0; - private MetricCount messageTruncateMetric = new MetricCount(); - - - public Collection<Output> getOutputList() { - return outputList; - } - - public void setOutputList(Collection<Output> outputList) { - this.outputList = outputList; - } - - public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { - Input input = inputMarker.input; - - // Update the block with the context fields - for (Map.Entry<String, String> entry : input.getContextFields() - .entrySet()) { - if (jsonObj.get(entry.getKey()) == null) { - jsonObj.put(entry.getKey(), entry.getValue()); - } - } - - // TODO: Ideally most of the overrides should be configurable - - // Add the input type - if (jsonObj.get("type") == null) { - jsonObj.put("type", input.getStringValue("type")); - } - if (jsonObj.get("path") == null && input.getFilePath() != null) { - jsonObj.put("path", input.getFilePath()); - } - if (jsonObj.get("path") == null && input.getStringValue("path") != null) { - jsonObj.put("path", input.getStringValue("path")); - } - - // Add host if required - if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) { - jsonObj.put("host", LogFeederUtil.hostName); - } - // Add IP if required - if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) { - jsonObj.put("ip", LogFeederUtil.ipAddress); - } - - //Add level - if (jsonObj.get("level") == null) { - jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN); - } - if (input.isUseEventMD5() || input.isGenEventMD5()) { - String prefix = ""; - Object logtimeObj = jsonObj.get("logtime"); - if (logtimeObj != null) { - if (logtimeObj instanceof Date) { - prefix = "" + ((Date) logtimeObj).getTime(); - } else { - prefix = logtimeObj.toString(); - } - } - Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson() - .toJson(jsonObj)); - if (input.isGenEventMD5()) { - jsonObj.put("event_md5", prefix + eventMD5.toString()); - } - if (input.isUseEventMD5()) { - jsonObj.put("id", prefix + eventMD5.toString()); - } - } - - // jsonObj.put("@timestamp", new Date()); - jsonObj.put("seq_num", new Long(doc_counter++)); - if (jsonObj.get("id") == null) { - jsonObj.put("id", UUID.randomUUID().toString()); - } - if (jsonObj.get("event_count") == null) { - jsonObj.put("event_count", new Integer(1)); - } - if (inputMarker.lineNumber > 0) { - jsonObj.put("logfile_line_number", new Integer( - inputMarker.lineNumber)); - } - if (jsonObj.containsKey("log_message")) { - // TODO: Let's check size only for log_message for now - String logMessage = (String) jsonObj.get("log_message"); - if (logMessage != null - && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { - messageTruncateMetric.count++; - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_MESSAGESIZE"; - LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Message is too big. size=" - + logMessage.getBytes().length + ", input=" - + input.getShortDescription() - + ". Truncating to " + MAX_OUTPUT_SIZE - + ", first upto 100 characters=" - + LogFeederUtil.subString(logMessage, 100), - null, logger, Level.WARN); - logMessage = new String(logMessage.getBytes(), 0, - MAX_OUTPUT_SIZE); - jsonObj.put("log_message", logMessage); - // Add error tags - @SuppressWarnings("unchecked") - List<String> tagsList = (List<String>) jsonObj.get("tags"); - if (tagsList == null) { - tagsList = new ArrayList<String>(); - jsonObj.put("tags", tagsList); - } - tagsList.add("error_message_truncated"); - - } - if (addMessageMD5) { - jsonObj.put("message_md5", - "" + LogFeederUtil.genHash(logMessage)); - } - } - //check log is allowed to send output - if (FilterLogData.INSTANCE.isAllowed(jsonObj)) { - for (Output output : input.getOutputList()) { - try { - output.write(jsonObj, inputMarker); - } catch (Exception e) { - logger.error("Error writing. to " + output.getShortDescription(), e); - } - } - } - } - - public void write(String jsonBlock, InputMarker inputMarker) { - //check log is allowed to send output - if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) { - for (Output output : inputMarker.input.getOutputList()) { - try { - output.write(jsonBlock, inputMarker); - } catch (Exception e) { - logger.error("Error writing. to " + output.getShortDescription(), e); - } - } - } - } - - public void close() { - logger.info("Close called for outputs ..."); - for (Output output : outputList) { - try { - output.setDrain(true); - output.close(); - } catch (Exception e) { - // Ignore - } - } - // Need to get this value from property - int iterations = 30; - int waitTimeMS = 1000; - int i; - boolean allClosed = true; - for (i = 0; i < iterations; i++) { - allClosed = true; - for (Output output : outputList) { - if (!output.isClosed()) { - try { - allClosed = false; - logger.warn("Waiting for output to close. " - + output.getShortDescription() + ", " - + (iterations - i) + " more seconds"); - Thread.sleep(waitTimeMS); - } catch (Throwable t) { - // Ignore - } - } - } - if (allClosed) { - break; - } - } - - if (!allClosed) { - logger.warn("Some outpus were not closed. Iterations=" + i); - for (Output output : outputList) { - if (!output.isClosed()) { - logger.warn("Output not closed. Will ignore it." - + output.getShortDescription() + ", pendingCound=" - + output.getPendingCount()); - } - } - } else { - logger.info("All outputs are closed. Iterations=" + i); - } - } - - public void logStats() { - for (Output output : outputList) { - output.logStat(); - } - LogFeederUtil.logStatForMetric(messageTruncateMetric, - "Stat: Messages Truncated", null); - } - - public void addMetricsContainers(List<MetricCount> metricsList) { - metricsList.add(messageTruncateMetric); - for (Output output : outputList) { - output.addMetricsContainers(metricsList); - } - } - - - public void copyFile(File inputFile, InputMarker inputMarker) { - Input input = inputMarker.input; - for (Output output : input.getOutputList()) { - try { - output.copyFile(inputFile, inputMarker); - }catch (Exception e) { - logger.error("Error coyping file . to " + output.getShortDescription(), - e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index e95f8df..26f1ddb 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.ambari.logfeeder.LogFeeder; +import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.spool.LogSpooler; @@ -47,10 +48,10 @@ import java.util.Map.Entry; * </ul> */ public class OutputS3File extends Output implements RolloverCondition, RolloverHandler { + private static final Logger LOG = Logger.getLogger(OutputS3File.class); public static final String INPUT_ATTRIBUTE_TYPE = "type"; public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json"; - static private Logger logger = Logger.getLogger(OutputS3File.class); private LogSpooler logSpooler; private S3OutputConfiguration s3OutputConfiguration; @@ -72,23 +73,21 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH @Override public void copyFile(File inputFile, InputMarker inputMarker) { String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE); - S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, - S3Util.INSTANCE, false, type); - String resolvedPath = s3Uploader.uploadFile(inputFile, - inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE)); + S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type); + String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE)); uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath); } - private void uploadConfig(InputMarker inputMarker, String type, - S3OutputConfiguration s3OutputConfiguration, String resolvedPath) { + private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration, + String resolvedPath) { ArrayList<Map<String, Object>> filters = new ArrayList<>(); addFilters(filters, inputMarker.input.getFirstFilter()); Map<String, Object> inputConfig = new HashMap<>(); inputConfig.putAll(inputMarker.input.getConfigs()); - String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() - + S3Util.S3_PATH_SEPARATOR + resolvedPath; + String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() + + LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath; inputConfig.put("path", s3CompletePath); ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>(); @@ -117,17 +116,15 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH } } - private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix, - S3OutputConfiguration s3OutputConfiguration) { + private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) { Gson gson = new GsonBuilder().setPrettyPrinting().create(); String configJson = gson.toJson(configToWrite); - String s3ResolvedKey = new S3LogPathResolver(). - getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster()); + String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, + s3OutputConfiguration.getCluster()); - S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), - s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(), - s3OutputConfiguration.getS3SecretKey()); + S3Util.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey, + s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey()); } private String getComponentConfigFileName(String componentName) { @@ -136,7 +133,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH private Map<String, Object> getGlobalConfig() { - Map<String, Object> globalConfig = LogFeeder.globalMap; + Map<String, Object> globalConfig = LogFeeder.globalConfigs; if (globalConfig == null) { globalConfig = new HashMap<>(); } @@ -173,8 +170,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH globalConfig.put("copy_file", false); globalConfig.put("process_file", true); globalConfig.put("tail", false); - Map<String, Object> addFields = (Map<String, Object>) globalConfig - .get("add_fields"); + Map<String, Object> addFields = (Map<String, Object>) globalConfig.get("add_fields"); if (addFields == null) { addFields = new HashMap<>(); } @@ -216,7 +212,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH @VisibleForTesting protected S3Uploader createUploader(String logType) { - S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType); + S3Uploader uploader = new S3Uploader(s3OutputConfiguration, true, logType); uploader.startUploaderThread(); return uploader; } @@ -224,8 +220,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH @VisibleForTesting protected LogSpooler createSpooler(String filePath) { String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service"; - logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", - spoolDirectory, filePath)); + LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath)); return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this, s3OutputConfiguration.getRolloverTimeThresholdSecs()); } @@ -244,7 +239,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH long currentSize = spoolFile.length(); boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes()); if (result) { - logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize, + LOG.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize, s3OutputConfiguration.getRolloverSizeThresholdBytes())); } return result; http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index cd9ce4d..47f139d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -34,7 +34,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.ambari.logfeeder.input.InputMarker; -import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr; +import org.apache.ambari.logfeeder.logconfig.LogConfigHandler; +import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; @@ -76,7 +77,17 @@ public class OutputSolr extends Output { private BlockingQueue<OutputData> outgoingBuffer = null; private List<SolrWorkerThread> workerThreadList = new ArrayList<>(); - + + @Override + protected String getStatMetricName() { + return "output.solr.write_logs"; + } + + @Override + protected String getWriteBytesMetricName() { + return "output.solr.write_bytes"; + } + @Override public void init() throws Exception { super.init(); @@ -87,9 +98,6 @@ public class OutputSolr extends Output { } private void initParams() throws Exception { - statMetric.metricsName = "output.solr.write_logs"; - writeBytesMetric.metricsName = "output.solr.write_bytes"; - splitMode = getStringValue("splits_interval_mins", "none"); if (!splitMode.equalsIgnoreCase("none")) { splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL); @@ -204,10 +212,8 @@ public class OutputSolr extends Output { LOG.info("Ping to Solr server is successful for worker=" + count); } else { LOG.warn( - String.format( - "Ping to Solr server failed. It would check again. worker=%d, " - + "solrUrl=%s, zkConnectString=%s, collection=%s, response=%s", - count, solrUrl, zkConnectString, collection, response)); + String.format("Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkConnectString=%s, " + + "collection=%s, response=%s", count, solrUrl, zkConnectString, collection, response)); } } catch (Throwable t) { LOG.warn(String.format( @@ -223,7 +229,7 @@ public class OutputSolr extends Output { while (true) { LOG.info("Checking if config is available"); - if (FetchConfigFromSolr.isFilterAvailable()) { + if (LogConfigHandler.isFilterAvailable()) { LOG.info("Config is available"); return; } @@ -256,7 +262,7 @@ public class OutputSolr extends Output { private void useActualDateIfNeeded(Map<String, Object> jsonObj) { if (skipLogtime) { - jsonObj.put("logtime", LogFeederUtil.getActualDateStr()); + jsonObj.put("logtime", DateUtil.getActualDateStr()); } } @@ -324,7 +330,7 @@ public class OutputSolr extends Output { private final SolrClient solrClient; private final Collection<SolrInputDocument> localBuffer = new ArrayList<>(); - private final Map<String, InputMarker> latestInputMarkerList = new HashMap<>(); + private final Map<String, InputMarker> latestInputMarkers = new HashMap<>(); private long localBufferBytesSize = 0; @@ -352,17 +358,16 @@ public class OutputSolr extends Output { } } - if (localBuffer.size() > 0 && ((outputData == null && isDrain()) - || (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) { + if (localBuffer.size() > 0 && ((outputData == null && isDrain()) || + (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) { boolean response = sendToSolr(outputData); if( isDrain() && !response) { //Since sending to Solr response failed and it is in draining mode, let's break; - LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" - + getShortDescription()); + LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" + getShortDescription()); break; } } - if( localBuffer.size() == 0 ) { + if (localBuffer.size() == 0) { //If localBuffer is empty, then reset the timer lastDispatchTime = currTimeMS; } @@ -403,8 +408,7 @@ public class OutputSolr extends Output { } catch (IOException | SolrException exception) { // Transient error, lets block till it is available try { - LOG.warn("Solr is not reachable. Going to retry after " - + RETRY_INTERVAL + " seconds. " + "output=" + LOG.warn("Solr is not reachable. Going to retry after " + RETRY_INTERVAL + " seconds. " + "output=" + getShortDescription(), exception); Thread.sleep(RETRY_INTERVAL * 1000); } catch (Throwable t) { @@ -414,8 +418,8 @@ public class OutputSolr extends Output { // Something unknown happened. Let's not block because of this error. // Clear the buffer String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, - "Error sending log message to server. Dropping logs", serverException, LOG, Level.ERROR); + LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending log message to server. Dropping logs", + serverException, LOG, Level.ERROR); resetLocalBuffer(); break; } @@ -447,7 +451,7 @@ public class OutputSolr extends Output { Level.ERROR); } } - latestInputMarkerList.put(outputData.inputMarker.base64FileKey, outputData.inputMarker); + latestInputMarkers.put(outputData.inputMarker.base64FileKey, outputData.inputMarker); localBuffer.add(document); } @@ -479,9 +483,9 @@ public class OutputSolr extends Output { LogFeederUtil.logErrorMessageByInterval(logMessageKey, String.format("Error writing to Solr. response=%s, log=%s", response, outputData), null, LOG, Level.ERROR); } - statMetric.count += localBuffer.size(); - writeBytesMetric.count += localBufferBytesSize; - for (InputMarker inputMarker : latestInputMarkerList.values()) { + statMetric.value += localBuffer.size(); + writeBytesMetric.value += localBufferBytesSize; + for (InputMarker inputMarker : latestInputMarkers.values()) { inputMarker.input.checkIn(inputMarker); } } @@ -499,7 +503,7 @@ public class OutputSolr extends Output { public void resetLocalBuffer() { localBuffer.clear(); localBufferBytesSize = 0; - latestInputMarkerList.clear(); + latestInputMarkers.clear(); } public boolean isDone() { @@ -512,9 +516,7 @@ public class OutputSolr extends Output { } @Override - public void copyFile(File inputFile, InputMarker inputMarker) - throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "copyFile method is not yet supported for output=solr"); + public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException { + throw new UnsupportedOperationException("copyFile method is not yet supported for output=solr"); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java index 58282e0..8c544cf 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,9 +18,9 @@ package org.apache.ambari.logfeeder.output; +import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; -import org.apache.ambari.logfeeder.util.S3Util; import java.util.HashMap; @@ -40,7 +40,7 @@ public class S3LogPathResolver { public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) { HashMap<String, String> contextParam = buildContextParam(cluster); String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam); - return resolvedKeyPrefix + S3Util.S3_PATH_SEPARATOR + keySuffix; + return resolvedKeyPrefix + LogFeederConstants.S3_PATH_SEPARATOR + keySuffix; } private HashMap<String, String> buildContextParam(String cluster) { http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java index 485b0d4..e5974c5 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -103,8 +103,7 @@ public class S3OutputConfiguration { }; for (int i = 0; i < longValuedKeysToCopy.length; i++) { - configs.put(longValuedKeysToCopy[i], - configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i])); + configs.put(longValuedKeysToCopy[i], configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i])); } configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY)); http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java index fd59c51..e95a663 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,8 +18,12 @@ package org.apache.ambari.logfeeder.output; +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; import com.google.common.annotations.VisibleForTesting; +import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.util.CompressionUtil; import org.apache.ambari.logfeeder.util.S3Util; import org.apache.log4j.Logger; @@ -39,20 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean; * {@link org.apache.ambari.logfeeder.input.InputFile}. */ public class S3Uploader implements Runnable { + private static final Logger LOG = Logger.getLogger(S3Uploader.class); + public static final String POISON_PILL = "POISON-PILL"; - private static Logger logger = Logger.getLogger(S3Uploader.class); private final S3OutputConfiguration s3OutputConfiguration; - private final S3Util s3UtilInstance; private final boolean deleteOnEnd; - private String logType; + private final String logType; private final BlockingQueue<String> fileContextsToUpload; - private AtomicBoolean stopRunningThread = new AtomicBoolean(false); + private final AtomicBoolean stopRunningThread = new AtomicBoolean(false); - public S3Uploader(S3OutputConfiguration s3OutputConfiguration, S3Util s3UtilInstance, boolean deleteOnEnd, - String logType) { + public S3Uploader(S3OutputConfiguration s3OutputConfiguration, boolean deleteOnEnd, String logType) { this.s3OutputConfiguration = s3OutputConfiguration; - this.s3UtilInstance = s3UtilInstance; this.deleteOnEnd = deleteOnEnd; this.logType = logType; this.fileContextsToUpload = new LinkedBlockingQueue<>(); @@ -81,7 +83,7 @@ public class S3Uploader implements Runnable { stopRunningThread.set(true); boolean offerStatus = fileContextsToUpload.offer(POISON_PILL); if (!offerStatus) { - logger.warn("Could not add poison pill to interrupt uploader thread."); + LOG.warn("Could not add poison pill to interrupt uploader thread."); } } @@ -92,7 +94,7 @@ public class S3Uploader implements Runnable { void addFileForUpload(String fileToUpload) { boolean offerStatus = fileContextsToUpload.offer(fileToUpload); if (!offerStatus) { - logger.error("Could not add file " + fileToUpload + " for upload."); + LOG.error("Could not add file " + fileToUpload + " for upload."); } } @@ -102,12 +104,12 @@ public class S3Uploader implements Runnable { try { String fileNameToUpload = fileContextsToUpload.take(); if (POISON_PILL.equals(fileNameToUpload)) { - logger.warn("Found poison pill while waiting for files to upload, exiting"); + LOG.warn("Found poison pill while waiting for files to upload, exiting"); return; } uploadFile(new File(fileNameToUpload), logType); } catch (InterruptedException e) { - logger.error("Interrupted while waiting for elements from fileContextsToUpload", e); + LOG.error("Interrupted while waiting for elements from fileContextsToUpload", e); return; } } @@ -130,34 +132,44 @@ public class S3Uploader implements Runnable { String compressionAlgo = s3OutputConfiguration.getCompressionAlgo(); String keySuffix = fileToUpload.getName() + "." + compressionAlgo; - String s3Path = new S3LogPathResolver(). - getResolvedPath(s3OutputConfiguration.getS3Path()+S3Util.S3_PATH_SEPARATOR+logType, - keySuffix, s3OutputConfiguration.getCluster()); - logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", - s3OutputConfiguration.getS3Path(), keySuffix, s3Path)); + String s3Path = new S3LogPathResolver().getResolvedPath( + s3OutputConfiguration.getS3Path() + LogFeederConstants.S3_PATH_SEPARATOR + logType, keySuffix, + s3OutputConfiguration.getCluster()); + LOG.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", s3OutputConfiguration.getS3Path(), keySuffix, s3Path)); File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo); - logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path); - s3UtilInstance.uploadFileTos3(bucketName, s3Path, sourceFile, s3AccessKey, - s3SecretKey); + LOG.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path); + uploadFileToS3(bucketName, s3Path, sourceFile, s3AccessKey, s3SecretKey); // delete local compressed file sourceFile.delete(); if (deleteOnEnd) { - logger.info("Deleting input file as required"); + LOG.info("Deleting input file as required"); if (!fileToUpload.delete()) { - logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3"); + LOG.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3"); } } return s3Path; } @VisibleForTesting + protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) { + TransferManager transferManager = S3Util.getTransferManager(accessKey, secretKey); + try { + Upload upload = transferManager.upload(bucketName, s3Key, localFile); + upload.waitForUploadResult(); + } catch (AmazonClientException | InterruptedException e) { + LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), e); + } finally { + S3Util.shutdownTransferManager(transferManager); + } + } + + @VisibleForTesting protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { - File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" - + new Date().getTime() + "." + compressionAlgo); - outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, - compressionAlgo); + File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + new Date().getTime() + + "." + compressionAlgo); + outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, compressionAlgo); return outputFile; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java index fb263ba..1f13357 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -39,8 +39,9 @@ import java.util.concurrent.atomic.AtomicBoolean; * {@link RolloverHandler} to trigger the handling of the rolled over file. */ public class LogSpooler { + + private static final Logger LOG = Logger.getLogger(LogSpooler.class); public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0; - static private Logger logger = Logger.getLogger(LogSpooler.class); static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss"; private String spoolDirectory; @@ -98,7 +99,7 @@ public class LogSpooler { private void initializeSpoolDirectory() { File spoolDir = new File(spoolDirectory); if (!spoolDir.exists()) { - logger.info("Creating spool directory: " + spoolDir); + LOG.info("Creating spool directory: " + spoolDir); boolean result = spoolDir.mkdirs(); if (!result) { throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory); @@ -116,7 +117,7 @@ public class LogSpooler { + ", error message: " + e.getLocalizedMessage(), e); } currentSpoolerContext = new LogSpoolerContext(currentSpoolFile); - logger.info("Initialized spool file at path: " + currentSpoolFile); + LOG.info("Initialized spool file at path: " + currentSpoolFile); } @VisibleForTesting @@ -141,7 +142,7 @@ public class LogSpooler { currentSpoolBufferedWriter.println(logEvent); currentSpoolerContext.logEventSpooled(); if (rolloverCondition.shouldRollover(currentSpoolerContext)) { - logger.info("Trying to rollover based on rollover condition"); + LOG.info("Trying to rollover based on rollover condition"); tryRollover(); } } @@ -154,19 +155,19 @@ public class LogSpooler { * rolled over file. */ public void rollover() { - logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile); + LOG.info("Rollover condition detected, rolling over file: " + currentSpoolFile); currentSpoolBufferedWriter.flush(); if (currentSpoolFile.length()==0) { - logger.info("No data in file " + currentSpoolFile + ", not doing rollover"); + LOG.info("No data in file " + currentSpoolFile + ", not doing rollover"); } else { currentSpoolBufferedWriter.close(); rolloverHandler.handleRollover(currentSpoolFile); - logger.info("Invoked rollover handler with file: " + currentSpoolFile); + LOG.info("Invoked rollover handler with file: " + currentSpoolFile); initializeSpoolState(); } boolean status = rolloverInProgress.compareAndSet(true, false); if (!status) { - logger.error("Should have reset rollover flag!!"); + LOG.error("Should have reset rollover flag!!"); } } @@ -174,7 +175,7 @@ public class LogSpooler { if (rolloverInProgress.compareAndSet(false, true)) { rollover(); } else { - logger.warn("Ignoring rollover call as rollover already in progress for file " + + LOG.warn("Ignoring rollover call as rollover already in progress for file " + currentSpoolFile); } } @@ -197,7 +198,7 @@ public class LogSpooler { private class LogSpoolerRolloverTimerTask extends TimerTask { @Override public void run() { - logger.info("Trying rollover based on time"); + LOG.info("Trying rollover based on time"); tryRollover(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java index 084d6a2..616300f 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java index 1e12fb7..14bb139 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java index 8279645..48ace11 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java index 11308e4..2ec2708 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java index 15f7594..f814a92 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java @@ -20,62 +20,20 @@ package org.apache.ambari.logfeeder.util; import org.apache.log4j.Logger; -import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient; -public enum AWSUtil { - INSTANCE; +public class AWSUtil { private static final Logger LOG = Logger.getLogger(AWSUtil.class); - public String getAwsUserName(String accessKey, String secretKey) { - String username = null; - AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey); - AmazonIdentityManagementClient amazonIdentityManagementClient; - if (awsCredentials != null) { - amazonIdentityManagementClient = new AmazonIdentityManagementClient( - awsCredentials); - } else { - // create default client - amazonIdentityManagementClient = new AmazonIdentityManagementClient(); - } - try { - username = amazonIdentityManagementClient.getUser().getUser() - .getUserName(); - } catch (AmazonServiceException e) { - if (e.getErrorCode().compareTo("AccessDenied") == 0) { - String arn = null; - String msg = e.getMessage(); - int arnIdx = msg.indexOf("arn:aws"); - if (arnIdx != -1) { - int arnSpace = msg.indexOf(" ", arnIdx); - // should be similar to "arn:aws:iam::111111111111:user/username" - arn = msg.substring(arnIdx, arnSpace); - } - if (arn != null) { - String[] arnParts = arn.split(":"); - if (arnParts != null && arnParts.length > 5) { - username = arnParts[5]; - if (username != null) { - username = username.replace("user/", ""); - } - } - } - } - } catch (Exception exception) { - LOG.error( - "Error in getting username :" + exception.getLocalizedMessage(), - exception.getCause()); - } - return username; + private AWSUtil() { + throw new UnsupportedOperationException(); } - public AWSCredentials createAWSCredentials(String accessKey, String secretKey) { + public static AWSCredentials createAWSCredentials(String accessKey, String secretKey) { if (accessKey != null && secretKey != null) { LOG.debug("Creating aws client as per new accesskey and secretkey"); - AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, - secretKey); + AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); return awsCredentials; } else { return null; http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java index a92ba29..5049b62 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java @@ -21,69 +21,90 @@ package org.apache.ambari.logfeeder.util; import java.io.File; import java.util.HashMap; +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.mapper.Mapper; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; public class AliasUtil { - private static Logger logger = Logger.getLogger(AliasUtil.class); + private static final Logger LOG = Logger.getLogger(AliasUtil.class); - private static AliasUtil instance = null; + private static final String ALIAS_CONFIG_JSON = "alias_config.json"; + private static HashMap<String, Object> aliasMap = null; - private static String aliasConfigJson = "alias_config.json"; - - private HashMap<String, Object> aliasMap = null; - - public static enum ALIAS_TYPE { - INPUT, FILTER, MAPPER, OUTPUT + static { + File jsonFile = FileUtil.getFileFromClasspath(ALIAS_CONFIG_JSON); + if (jsonFile != null) { + aliasMap = FileUtil.readJsonFromFile(jsonFile); + } } - public static enum ALIAS_PARAM { - KLASS + public static enum AliasType { + INPUT, FILTER, MAPPER, OUTPUT } private AliasUtil() { - init(); + throw new UnsupportedOperationException(); } - public static AliasUtil getInstance() { - if (instance == null) { - synchronized (AliasUtil.class) { - if (instance == null) { - instance = new AliasUtil(); - } - } + public static Object getClassInstance(String key, AliasType aliasType) { + String classFullName = getClassFullName(key, aliasType); + + Object instance = null; + try { + instance = (Object) Class.forName(classFullName).getConstructor().newInstance(); + } catch (Exception exception) { + LOG.error("Unsupported class = " + classFullName, exception.getCause()); } - return instance; - } - /** - */ - private void init() { - File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson); - if (jsonFile != null) { - this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile); + if (instance != null) { + boolean isValid = false; + switch (aliasType) { + case FILTER: + isValid = Filter.class.isAssignableFrom(instance.getClass()); + break; + case INPUT: + isValid = Input.class.isAssignableFrom(instance.getClass()); + break; + case OUTPUT: + isValid = Output.class.isAssignableFrom(instance.getClass()); + break; + case MAPPER: + isValid = Mapper.class.isAssignableFrom(instance.getClass()); + break; + default: + LOG.warn("Unhandled aliasType: " + aliasType); + isValid = true; + } + if (!isValid) { + LOG.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name()); + } } - + return instance; } - - public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) { - String result = key;// key as a default value; + private static String getClassFullName(String key, AliasType aliastype) { + String className = null;// key as a default value; + HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype); - String value = aliasInfo.get(aliasParam.name().toLowerCase()); - if (value != null && !value.isEmpty()) { - result = value; - logger.debug("Alias found for key :" + key + ", param :" + aliasParam.name().toLowerCase() + ", value :" - + value + " aliastype:" + aliastype.name()); + String value = aliasInfo.get("klass"); + if (!StringUtils.isEmpty(value)) { + className = value; + LOG.debug("Class name found for key :" + key + ", class name :" + className + " aliastype:" + aliastype.name()); } else { - logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase()); + LOG.debug("Class name not found for key :" + key + " aliastype:" + aliastype.name()); } - return result; + + return className; } @SuppressWarnings("unchecked") - private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) { - HashMap<String, String> aliasInfo = null; + private static HashMap<String, String> getAliasInfo(String key, AliasType aliastype) { + HashMap<String, String> aliasInfo = new HashMap<String, String>(); + if (aliasMap != null) { String typeKey = aliastype.name().toLowerCase(); HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey); @@ -91,9 +112,7 @@ public class AliasUtil { aliasInfo = (HashMap<String, String>) typeJson.get(key); } } - if (aliasInfo == null) { - aliasInfo = new HashMap<String, String>(); - } + return aliasInfo; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java index c2addbd..c460ab3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java @@ -37,25 +37,20 @@ public class CompressionUtil { FileInputStream ios = null; try { if (!inputFile.exists()) { - throw new IllegalArgumentException("Input File:" - + inputFile.getAbsolutePath() + " is not exist."); + throw new IllegalArgumentException("Input File:" + inputFile.getAbsolutePath() + " is not exist."); } if (inputFile.isDirectory()) { - throw new IllegalArgumentException("Input File:" - + inputFile.getAbsolutePath() + " is a directory."); + throw new IllegalArgumentException("Input File:" + inputFile.getAbsolutePath() + " is a directory."); } File parent = outputFile.getParentFile(); if (parent != null && !parent.exists()) { boolean isParentCreated = parent.mkdirs(); if (!isParentCreated) { - throw new IllegalAccessException( - "User does not have permission to create parent directory :" - + parent.getAbsolutePath()); + throw new IllegalAccessException( "User does not have permission to create parent directory :" + parent.getAbsolutePath()); } } - final OutputStream out = new FileOutputStream(outputFile); - cos = new CompressorStreamFactory().createCompressorOutputStream( - algoName, out); + OutputStream out = new FileOutputStream(outputFile); + cos = new CompressorStreamFactory().createCompressorOutputStream(algoName, out); ios = new FileInputStream(inputFile); IOUtils.copy(ios, cos); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java index 2ca9353..6321e17 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java @@ -20,12 +20,17 @@ package org.apache.ambari.logfeeder.util; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.TimeZone; import org.apache.log4j.Logger; public class DateUtil { - private static final Logger logger = Logger.getLogger(DateUtil.class); - + private static final Logger LOG = Logger.getLogger(DateUtil.class); + + private DateUtil() { + throw new UnsupportedOperationException(); + } + public static String dateToString(Date date, String dateFormat) { if (date == null || dateFormat == null || dateFormat.isEmpty()) { return ""; @@ -34,8 +39,36 @@ public class DateUtil { SimpleDateFormat formatter = new SimpleDateFormat(dateFormat); return formatter.format(date).toString(); } catch (Exception e) { - logger.error("Error in coverting dateToString format :" + dateFormat, e); + LOG.error("Error in coverting dateToString format :" + dateFormat, e); } return ""; } + + private final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf; + } + }; + + public static String getDate(String timeStampStr) { + try { + return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr))); + } catch (Exception ex) { + LOG.error(ex); + return null; + } + } + + public static String getActualDateStr() { + try { + return dateFormatter.get().format(new Date()); + } catch (Exception ex) { + LOG.error(ex); + return null; + } + } }
