http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java new file mode 100644 index 0000000..8a07602 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.common; + +public class LogfeederException extends Exception { + + public LogfeederException(String message, Throwable throwable) { + super(message, throwable); + } + + public LogfeederException(String message) { + super(message); + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java deleted file mode 100644 index c22b512..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.exception; - -public class LogfeederException extends Exception { - - public LogfeederException(String message, Throwable throwable) { - super(message, throwable); - } - - public LogfeederException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java index 01d4f79..ab371f1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java @@ -24,17 +24,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.AliasUtil; -import org.apache.ambari.logfeeder.ConfigBlock; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.MetricCount; -import org.apache.ambari.logfeeder.OutputMgr; -import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM; -import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE; -import org.apache.ambari.logfeeder.exception.LogfeederException; +import org.apache.ambari.logfeeder.common.ConfigBlock; +import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.mapper.Mapper; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.output.OutputMgr; +import org.apache.ambari.logfeeder.util.AliasUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM; +import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE; import org.apache.log4j.Logger; import org.apache.log4j.Priority; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 7aa649d..372c208 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -34,10 +34,10 @@ import java.util.regex.Pattern; import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.exception.GrokException; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.MetricCount; -import org.apache.ambari.logfeeder.exception.LogfeederException; +import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java index f375374..2954106 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java @@ -20,9 +20,9 @@ package org.apache.ambari.logfeeder.filter; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.exception.LogfeederException; +import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; public class FilterJSON extends Filter { http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java index 1b8b3a3..7adb468 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -23,10 +23,10 @@ import java.util.List; import java.util.Map; import java.util.StringTokenizer; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.MetricCount; -import org.apache.ambari.logfeeder.exception.LogfeederException; +import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 76af16c..5feb9c4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -26,13 +26,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.ConfigBlock; -import org.apache.ambari.logfeeder.InputMgr; -import org.apache.ambari.logfeeder.MetricCount; -import org.apache.ambari.logfeeder.OutputMgr; -import org.apache.ambari.logfeeder.exception.LogfeederException; +import org.apache.ambari.logfeeder.common.ConfigBlock; +import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.metrics.MetricCount; import org.apache.ambari.logfeeder.output.Output; +import org.apache.ambari.logfeeder.output.OutputMgr; import org.apache.log4j.Logger; public abstract class Input extends ConfigBlock implements Runnable { http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index 9d3545e..c9f5ded 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -33,8 +33,8 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java new file mode 100644 index 0000000..b18c9b0 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.input; + +import java.io.EOFException; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.log4j.Logger; +import org.apache.solr.common.util.Base64; + +public class InputMgr { + private static final Logger logger = Logger.getLogger(InputMgr.class); + + private List<Input> inputList = new ArrayList<Input>(); + private Set<Input> notReadyList = new HashSet<Input>(); + + private boolean isDrain = false; + private boolean isAnyInputTail = false; + + private String checkPointSubFolderName = "logfeeder_checkpoints"; + private File checkPointFolderFile = null; + + private MetricCount filesCountMetric = new MetricCount(); + + private String checkPointExtension = ".cp"; + + private Thread inputIsReadyMonitor = null; + + public List<Input> getInputList() { + return inputList; + } + + public void add(Input input) { + inputList.add(input); + } + + public void removeInput(Input input) { + logger.info("Trying to remove from inputList. " + + input.getShortDescription()); + Iterator<Input> iter = inputList.iterator(); + while (iter.hasNext()) { + Input iterInput = iter.next(); + if (iterInput.equals(input)) { + logger.info("Removing Input from inputList. " + + input.getShortDescription()); + iter.remove(); + } + } + } + + public int getActiveFilesCount() { + int count = 0; + for (Input input : inputList) { + if (input.isReady()) { + count++; + } + } + return count; + } + + public void init() { + filesCountMetric.metricsName = "input.files.count"; + filesCountMetric.isPointInTime = true; + + checkPointExtension = LogFeederUtil.getStringProperty( + "logfeeder.checkpoint.extension", checkPointExtension); + for (Input input : inputList) { + try { + input.init(); + if (input.isTail()) { + isAnyInputTail = true; + } + } catch (Exception e) { + logger.error( + "Error initializing input. " + + input.getShortDescription(), e); + } + } + + if (isAnyInputTail) { + logger.info("Determining valid checkpoint folder"); + boolean isCheckPointFolderValid = false; + // We need to keep track of the files we are reading. + String checkPointFolder = LogFeederUtil + .getStringProperty("logfeeder.checkpoint.folder"); + if (checkPointFolder != null && !checkPointFolder.isEmpty()) { + checkPointFolderFile = new File(checkPointFolder); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + } + if (!isCheckPointFolderValid) { + // Let's try home folder + String userHome = LogFeederUtil.getStringProperty("user.home"); + if (userHome != null) { + checkPointFolderFile = new File(userHome, + checkPointSubFolderName); + logger.info("Checking if home folder can be used for checkpoints. Folder=" + + checkPointFolderFile); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + } + } + if (!isCheckPointFolderValid) { + // Let's use tmp folder + String tmpFolder = LogFeederUtil + .getStringProperty("java.io.tmpdir"); + if (tmpFolder == null) { + tmpFolder = "/tmp"; + } + checkPointFolderFile = new File(tmpFolder, + checkPointSubFolderName); + logger.info("Checking if tmps folder can be used for checkpoints. Folder=" + + checkPointFolderFile); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + if (isCheckPointFolderValid) { + logger.warn("Using tmp folder " + + checkPointFolderFile + + " to store check points. This is not recommended." + + "Please set logfeeder.checkpoint.folder property"); + } + } + + if (isCheckPointFolderValid) { + logger.info("Using folder " + checkPointFolderFile + + " for storing checkpoints"); + } + } + + } + + public File getCheckPointFolderFile() { + return checkPointFolderFile; + } + + private boolean verifyCheckPointFolder(File folderPathFile) { + if (!folderPathFile.exists()) { + // Create the folder + try { + if (!folderPathFile.mkdir()) { + logger.warn("Error creating folder for check point. folder=" + + folderPathFile); + } + } catch (Throwable t) { + logger.warn("Error creating folder for check point. folder=" + + folderPathFile, t); + } + } + + if (folderPathFile.exists() && folderPathFile.isDirectory()) { + // Let's check whether we can create a file + File testFile = new File(folderPathFile, UUID.randomUUID() + .toString()); + try { + testFile.createNewFile(); + return testFile.delete(); + } catch (IOException e) { + logger.warn( + "Couldn't create test file in " + + folderPathFile.getAbsolutePath() + + " for checkPoint", e); + } + } + return false; + } + + public void monitor() { + for (Input input : inputList) { + if (input.isReady()) { + input.monitor(); + } else { + if (input.isTail()) { + logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. " + + input.getShortDescription()); + notReadyList.add(input); + } else { + logger.info("Input is not ready, so going to ignore it " + + input.getShortDescription()); + } + } + } + // Start the monitoring thread if any file is in tail mode + if (isAnyInputTail) { + inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { + @Override + public void run() { + logger.info("Going to monitor for these missing files: " + + notReadyList.toString()); + while (true) { + if (isDrain) { + logger.info("Exiting missing file monitor."); + break; + } + try { + Iterator<Input> iter = notReadyList.iterator(); + while (iter.hasNext()) { + Input input = iter.next(); + try { + if (input.isReady()) { + input.monitor(); + iter.remove(); + } + } catch (Throwable t) { + logger.error("Error while enabling monitoring for input. " + + input.getShortDescription()); + } + } + Thread.sleep(30 * 1000); + } catch (Throwable t) { + // Ignore + } + } + } + }; + inputIsReadyMonitor.start(); + } + } + + public void addToNotReady(Input notReadyInput) { + notReadyList.add(notReadyInput); + } + + public void addMetricsContainers(List<MetricCount> metricsList) { + for (Input input : inputList) { + input.addMetricsContainers(metricsList); + } + filesCountMetric.count = getActiveFilesCount(); + metricsList.add(filesCountMetric); + } + + public void logStats() { + for (Input input : inputList) { + input.logStat(); + } + + filesCountMetric.count = getActiveFilesCount(); + LogFeederUtil.logStatForMetric(filesCountMetric, + "Stat: Files Monitored Count", null); + } + + public void close() { + for (Input input : inputList) { + try { + input.setDrain(true); + } catch (Throwable t) { + logger.error( + "Error while draining. input=" + + input.getShortDescription(), t); + } + } + isDrain = true; + + // Need to get this value from property + int iterations = 30; + int waitTimeMS = 1000; + int i = 0; + boolean allClosed = true; + for (i = 0; i < iterations; i++) { + allClosed = true; + for (Input input : inputList) { + if (!input.isClosed()) { + try { + allClosed = false; + logger.warn("Waiting for input to close. " + + input.getShortDescription() + ", " + + (iterations - i) + " more seconds"); + Thread.sleep(waitTimeMS); + } catch (Throwable t) { + // Ignore + } + } + } + if (allClosed) { + break; + } + } + if (!allClosed) { + logger.warn("Some inputs were not closed. Iterations=" + i); + for (Input input : inputList) { + if (!input.isClosed()) { + logger.warn("Input not closed. Will ignore it." + + input.getShortDescription()); + } + } + } else { + logger.info("All inputs are closed. Iterations=" + i); + } + + } + + public void checkInAll() { + for (Input input : inputList) { + input.checkIn(); + } + } + + public void cleanCheckPointFiles() { + + if (checkPointFolderFile == null) { + logger.info("Will not clean checkPoint files. checkPointFolderFile=" + + checkPointFolderFile); + return; + } + logger.info("Cleaning checkPoint files. checkPointFolderFile=" + + checkPointFolderFile.getAbsolutePath()); + try { + // Loop over the check point files and if filePath is not present, then move to closed + String searchPath = "*" + checkPointExtension; + FileFilter fileFilter = new WildcardFileFilter(searchPath); + File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter); + int totalCheckFilesDeleted = 0; + for (File checkPointFile : checkPointFiles) { + RandomAccessFile checkPointReader = null; + try { + checkPointReader = new RandomAccessFile(checkPointFile, "r"); + + int contentSize = checkPointReader.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointReader.read(b, 0, contentSize); + if (readSize != contentSize) { + logger.error("Couldn't read expected number of bytes from checkpoint file. expected=" + + contentSize + + ", read=" + + readSize + + ", checkPointFile=" + checkPointFile); + } else { + // Create JSON string + String jsonCheckPointStr = new String(b, 0, readSize); + Map<String, Object> jsonCheckPoint = LogFeederUtil + .toJSONObject(jsonCheckPointStr); + + String logFilePath = (String) jsonCheckPoint + .get("file_path"); + String logFileKey = (String) jsonCheckPoint + .get("file_key"); + if (logFilePath != null && logFileKey != null) { + boolean deleteCheckPointFile = false; + File logFile = new File(logFilePath); + if (logFile.exists()) { + Object fileKeyObj = InputFile + .getFileKey(logFile); + String fileBase64 = Base64 + .byteArrayToBase64(fileKeyObj + .toString().getBytes()); + if (!logFileKey.equals(fileBase64)) { + deleteCheckPointFile = true; + logger.info("CheckPoint clean: File key has changed. old=" + + logFileKey + + ", new=" + + fileBase64 + + ", filePath=" + + logFilePath + + ", checkPointFile=" + + checkPointFile.getAbsolutePath()); + } + } else { + logger.info("CheckPoint clean: Log file doesn't exist. filePath=" + + logFilePath + + ", checkPointFile=" + + checkPointFile.getAbsolutePath()); + deleteCheckPointFile = true; + } + if (deleteCheckPointFile) { + logger.info("Deleting CheckPoint file=" + + checkPointFile.getAbsolutePath() + + ", logFile=" + logFilePath); + checkPointFile.delete(); + totalCheckFilesDeleted++; + } + } + } + } catch (EOFException eof) { + logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + + checkPointFile); + } catch (Throwable t) { + logger.error("Error while checking checkPoint file. " + + checkPointFile, t); + } finally { + if (checkPointReader != null) { + try { + checkPointReader.close(); + } catch (Throwable t) { + logger.error("Error closing checkPoint file. " + + checkPointFile, t); + } + } + } + } + logger.info("Deleted " + totalCheckFilesDeleted + + " checkPoint file(s). checkPointFolderFile=" + + checkPointFolderFile.getAbsolutePath()); + + } catch (Throwable t) { + logger.error("Error while cleaning checkPointFiles", t); + } + } + + public void waitOnAllInputs() { + //wait on inputs + if (inputList != null) { + for (Input input : inputList) { + if (input != null) { + Thread inputThread = input.getThread(); + if (inputThread != null) { + try { + inputThread.join(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + } + // wait on monitor + if (inputIsReadyMonitor != null) { + try { + this.close(); + inputIsReadyMonitor.join(); + } catch (InterruptedException e) { + // ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index 12a512f..c9d28bd 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -28,8 +28,8 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.S3Util; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index 48ad7ac..5ba56a5 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -27,9 +27,9 @@ import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.filter.FilterJSON; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.solr.common.util.Base64; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java index 872460b..ae0cfc0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.TimeZone; -import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.SolrUtil; import org.apache.ambari.logfeeder.view.VLogfeederFilter; import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java index 128c5c4..bc807193 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java @@ -22,7 +22,7 @@ package org.apache.ambari.logfeeder.logconfig; import java.util.ArrayList; import java.util.List; -import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; public enum LogfeederScheduler { http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java index 8691a19..b5e4eb3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java @@ -22,9 +22,9 @@ package org.apache.ambari.logfeeder.logconfig.filter; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr; import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.view.VLogfeederFilter; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java index bf33f93..3a8eae9 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java @@ -21,8 +21,8 @@ package org.apache.ambari.logfeeder.logconfig.filter; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; /** http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java index 45ccc70..9aa0b23 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java index e1f8f97..c692a9d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java @@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.mapper; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java index 7e530f5..e618261 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java @@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.mapper; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java new file mode 100644 index 0000000..0a0f4e9 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.metrics; + +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.log4j.Logger; + +// TODO: Refactor for failover +public class LogFeederAMSClient extends AbstractTimelineMetricsSink { + private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class); + + private String collectorHosts = null; + + public LogFeederAMSClient() { + collectorHosts = LogFeederUtil + .getStringProperty("logfeeder.metrics.collector.hosts"); + if (collectorHosts != null && collectorHosts.trim().length() == 0) { + collectorHosts = null; + } + if (collectorHosts != null) { + collectorHosts = collectorHosts.trim(); + } + logger.info("AMS collector URL=" + collectorHosts); + } + + @Override + public String getCollectorUri(String host) { + return collectorHosts; + } + + @Override + protected int getTimeoutSeconds() { + // TODO: Hard coded timeout + return 10; + } + + @Override + protected String getZookeeperQuorum() { + return null; + } + + @Override + protected String getConfiguredCollectors() { + return null; + } + + @Override + protected String getHostname() { + return null; + } + + @Override + protected boolean emitMetrics(TimelineMetrics metrics) { + return super.emitMetrics(metrics); + } + + @Override + protected String getCollectorProtocol() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java new file mode 100644 index 0000000..abb84c7 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.metrics; + +public class MetricCount { + public String metricsName = null; + public boolean isPointInTime = false; + + public long count = 0; + public long prevLogCount = 0; + public long prevLogMS = System.currentTimeMillis(); + public long prevPublishCount = 0; + public int publishCount = 0; // Count of published metrics. Used for first time sending metrics +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java new file mode 100644 index 0000000..33397c7 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.metrics; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.log4j.Logger; + +public class MetricsMgr { + private static final Logger logger = Logger.getLogger(MetricsMgr.class); + + private boolean isMetricsEnabled = false; + private String nodeHostName = null; + private String appId = "logfeeder"; + + private long lastPublishTimeMS = 0; // Let's do the first publish immediately + private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock + + private int publishIntervalMS = 60 * 1000; + private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep + // the metrics in memory forever + private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>(); + private LogFeederAMSClient amsClient = null; + + public void init() { + logger.info("Initializing MetricsMgr()"); + amsClient = new LogFeederAMSClient(); + + if (amsClient.getCollectorUri(null) != null) { + nodeHostName = LogFeederUtil.getStringProperty("node.hostname"); + if (nodeHostName == null) { + try { + nodeHostName = InetAddress.getLocalHost().getHostName(); + } catch (Throwable e) { + logger.warn( + "Error getting hostname using InetAddress.getLocalHost().getHostName()", + e); + } + if (nodeHostName == null) { + try { + nodeHostName = InetAddress.getLocalHost() + .getCanonicalHostName(); + } catch (Throwable e) { + logger.warn( + "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", + e); + } + } + } + if (nodeHostName == null) { + isMetricsEnabled = false; + logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics"); + } else { + isMetricsEnabled = true; + logger.info("LogFeeder Metrics is enabled. Metrics host=" + + amsClient.getCollectorUri(null)); + } + } else { + logger.info("LogFeeder Metrics publish is disabled"); + } + } + + public boolean isMetricsEnabled() { + return isMetricsEnabled; + } + + synchronized public void useMetrics(List<MetricCount> metricsList) { + if (!isMetricsEnabled) { + return; + } + logger.info("useMetrics() metrics.size=" + metricsList.size()); + long currMS = System.currentTimeMillis(); + Long currMSLong = new Long(currMS); + for (MetricCount metric : metricsList) { + if (metric.metricsName == null) { + logger.debug("metric.metricsName is null"); + // Metrics is not meant to be published + continue; + } + long currCount = metric.count; + if (!metric.isPointInTime && metric.publishCount > 0 + && currCount <= metric.prevPublishCount) { + // No new data added, so let's ignore it + logger.debug("Nothing changed. " + metric.metricsName + + ", currCount=" + currCount + ", prevPublishCount=" + + metric.prevPublishCount); + continue; + } + metric.publishCount++; + + TimelineMetric timelineMetric = metricsMap.get(metric.metricsName); + if (timelineMetric == null) { + logger.debug("Creating new metric obbject for " + + metric.metricsName); + // First time for this metric + timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metric.metricsName); + timelineMetric.setHostName(nodeHostName); + timelineMetric.setAppId(appId); + timelineMetric.setStartTime(currMS); + timelineMetric.setType("Long"); + timelineMetric.setMetricValues(new TreeMap<Long, Double>()); + + metricsMap.put(metric.metricsName, timelineMetric); + } + logger.debug("Adding metrics=" + metric.metricsName); + if (metric.isPointInTime) { + timelineMetric.getMetricValues().put(currMSLong, + new Double(currCount)); + } else { + Double value = timelineMetric.getMetricValues().get(currMSLong); + if (value == null) { + value = new Double(0); + } + value += (currCount - metric.prevPublishCount); + timelineMetric.getMetricValues().put(currMSLong, value); + metric.prevPublishCount = currCount; + } + } + + if (metricsMap.size() > 0 + && currMS - lastPublishTimeMS > publishIntervalMS) { + try { + // Time to publish + TimelineMetrics timelineMetrics = new TimelineMetrics(); + List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>(); + timeLineMetricList.addAll(metricsMap.values()); + timelineMetrics.setMetrics(timeLineMetricList); + amsClient.emitMetrics(timelineMetrics); + logger.info("Published " + timeLineMetricList.size() + + " metrics to AMS"); + metricsMap.clear(); + timeLineMetricList.clear(); + lastPublishTimeMS = currMS; + } catch (Throwable t) { + logger.warn("Error sending metrics to AMS.", t); + if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) { + logger.error("AMS was not sent for last " + + maxMetricsBuffer + / 1000 + + " seconds. Purging it and will start rebuilding it again"); + metricsMap.clear(); + lastFailedPublishTimeMS = currMS; + } + } + } else { + logger.info("Not publishing metrics. metrics.size()=" + + metricsMap.size() + ", lastPublished=" + + (currMS - lastPublishTimeMS) / 1000 + + " seconds ago, intervalConfigured=" + publishIntervalMS + / 1000); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java index a4e0eda..6f84251 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.ambari.logfeeder.ConfigBlock; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.MetricCount; +import org.apache.ambari.logfeeder.common.ConfigBlock; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; public abstract class Output extends ConfigBlock { http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java index aef8dc5..18a5a54 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java @@ -26,8 +26,8 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java index f711a5f..a360215 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -19,12 +19,12 @@ package org.apache.ambari.logfeeder.output; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.spool.LogSpooler; import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; import org.apache.commons.lang3.StringUtils; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java index a7f2321..2595d87 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java @@ -25,8 +25,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedTransferQueue; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java new file mode 100644 index 0000000..0a6b7fa --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; +import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class OutputMgr { + private static final Logger logger = Logger.getLogger(OutputMgr.class); + + private Collection<Output> outputList = new ArrayList<Output>(); + + private boolean addMessageMD5 = true; + + private int MAX_OUTPUT_SIZE = 32765; // 32766-1 + private static long doc_counter = 0; + private MetricCount messageTruncateMetric = new MetricCount(); + + + public Collection<Output> getOutputList() { + return outputList; + } + + public void setOutputList(Collection<Output> outputList) { + this.outputList = outputList; + } + + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) { + Input input = inputMarker.input; + + // Update the block with the context fields + for (Map.Entry<String, String> entry : input.getContextFields() + .entrySet()) { + if (jsonObj.get(entry.getKey()) == null) { + jsonObj.put(entry.getKey(), entry.getValue()); + } + } + + // TODO: Ideally most of the overrides should be configurable + + // Add the input type + if (jsonObj.get("type") == null) { + jsonObj.put("type", input.getStringValue("type")); + } + if (jsonObj.get("path") == null && input.getFilePath() != null) { + jsonObj.put("path", input.getFilePath()); + } + if (jsonObj.get("path") == null && input.getStringValue("path") != null) { + jsonObj.put("path", input.getStringValue("path")); + } + + // Add host if required + if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) { + jsonObj.put("host", LogFeederUtil.hostName); + } + // Add IP if required + if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) { + jsonObj.put("ip", LogFeederUtil.ipAddress); + } + + //Add level + if (jsonObj.get("level") == null) { + jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN); + } + if (input.isUseEventMD5() || input.isGenEventMD5()) { + String prefix = ""; + Object logtimeObj = jsonObj.get("logtime"); + if (logtimeObj != null) { + if (logtimeObj instanceof Date) { + prefix = "" + ((Date) logtimeObj).getTime(); + } else { + prefix = logtimeObj.toString(); + } + } + Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson() + .toJson(jsonObj)); + if (input.isGenEventMD5()) { + jsonObj.put("event_md5", prefix + eventMD5.toString()); + } + if (input.isUseEventMD5()) { + jsonObj.put("id", prefix + eventMD5.toString()); + } + } + + // jsonObj.put("@timestamp", new Date()); + jsonObj.put("seq_num", new Long(doc_counter++)); + if (jsonObj.get("id") == null) { + jsonObj.put("id", UUID.randomUUID().toString()); + } + if (jsonObj.get("event_count") == null) { + jsonObj.put("event_count", new Integer(1)); + } + if (inputMarker.lineNumber > 0) { + jsonObj.put("logfile_line_number", new Integer( + inputMarker.lineNumber)); + } + if (jsonObj.containsKey("log_message")) { + // TODO: Let's check size only for log_message for now + String logMessage = (String) jsonObj.get("log_message"); + if (logMessage != null + && logMessage.getBytes().length > MAX_OUTPUT_SIZE) { + messageTruncateMetric.count++; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_MESSAGESIZE"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Message is too big. size=" + + logMessage.getBytes().length + ", input=" + + input.getShortDescription() + + ". Truncating to " + MAX_OUTPUT_SIZE + + ", first upto 100 characters=" + + LogFeederUtil.subString(logMessage, 100), + null, logger, Level.WARN); + logMessage = new String(logMessage.getBytes(), 0, + MAX_OUTPUT_SIZE); + jsonObj.put("log_message", logMessage); + // Add error tags + @SuppressWarnings("unchecked") + List<String> tagsList = (List<String>) jsonObj.get("tags"); + if (tagsList == null) { + tagsList = new ArrayList<String>(); + jsonObj.put("tags", tagsList); + } + tagsList.add("error_message_truncated"); + + } + if (addMessageMD5) { + jsonObj.put("message_md5", + "" + LogFeederUtil.genHash(logMessage)); + } + } + //check log is allowed to send output + if (FilterLogData.INSTANCE.isAllowed(jsonObj)) { + for (Output output : input.getOutputList()) { + try { + output.write(jsonObj, inputMarker); + } catch (Exception e) { + logger.error("Error writing. to " + output.getShortDescription(), e); + } + } + } + } + + public void write(String jsonBlock, InputMarker inputMarker) { + //check log is allowed to send output + if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) { + for (Output output : inputMarker.input.getOutputList()) { + try { + output.write(jsonBlock, inputMarker); + } catch (Exception e) { + logger.error("Error writing. to " + output.getShortDescription(), e); + } + } + } + } + + public void close() { + logger.info("Close called for outputs ..."); + for (Output output : outputList) { + try { + output.setDrain(true); + output.close(); + } catch (Exception e) { + // Ignore + } + } + // Need to get this value from property + int iterations = 30; + int waitTimeMS = 1000; + int i; + boolean allClosed = true; + for (i = 0; i < iterations; i++) { + allClosed = true; + for (Output output : outputList) { + if (!output.isClosed()) { + try { + allClosed = false; + logger.warn("Waiting for output to close. " + + output.getShortDescription() + ", " + + (iterations - i) + " more seconds"); + Thread.sleep(waitTimeMS); + } catch (Throwable t) { + // Ignore + } + } + } + if (allClosed) { + break; + } + } + + if (!allClosed) { + logger.warn("Some outpus were not closed. Iterations=" + i); + for (Output output : outputList) { + if (!output.isClosed()) { + logger.warn("Output not closed. Will ignore it." + + output.getShortDescription() + ", pendingCound=" + + output.getPendingCount()); + } + } + } else { + logger.info("All outputs are closed. Iterations=" + i); + } + } + + public void logStats() { + for (Output output : outputList) { + output.logStat(); + } + LogFeederUtil.logStatForMetric(messageTruncateMetric, + "Stat: Messages Truncated", null); + } + + public void addMetricsContainers(List<MetricCount> metricsList) { + metricsList.add(messageTruncateMetric); + for (Output output : outputList) { + output.addMetricsContainers(metricsList); + } + } + + + public void copyFile(File inputFile, InputMarker inputMarker) { + Input input = inputMarker.input; + for (Output output : input.getOutputList()) { + try { + output.copyFile(inputFile, inputMarker); + }catch (Exception e) { + logger.error("Error coyping file . to " + output.getShortDescription(), + e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index cbc1045..e95f8df 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -22,14 +22,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.ambari.logfeeder.LogFeeder; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.output.spool.LogSpooler; import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; import org.apache.ambari.logfeeder.output.spool.RolloverCondition; import org.apache.ambari.logfeeder.output.spool.RolloverHandler; -import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.S3Util; import org.apache.log4j.Logger; import java.io.File; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index b4dac72..cd9ce4d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -33,9 +33,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java index 1bbf33e..58282e0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java @@ -18,9 +18,9 @@ package org.apache.ambari.logfeeder.output; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.PlaceholderUtil; +import org.apache.ambari.logfeeder.util.S3Util; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java index fb597d3..485b0d4 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java @@ -18,11 +18,11 @@ package org.apache.ambari.logfeeder.output; -import org.apache.ambari.logfeeder.ConfigBlock; - import java.util.HashMap; import java.util.Map; +import org.apache.ambari.logfeeder.common.ConfigBlock; + /** * Holds all configuration relevant for S3 upload. */ http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java index dec685f..fd59c51 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java @@ -19,9 +19,9 @@ package org.apache.ambari.logfeeder.output; import com.google.common.annotations.VisibleForTesting; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.s3.S3Util; + import org.apache.ambari.logfeeder.util.CompressionUtil; +import org.apache.ambari.logfeeder.util.S3Util; import org.apache.log4j.Logger; import java.io.File; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java deleted file mode 100644 index d0fbb6c..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.s3; - -import org.apache.log4j.Logger; - -import com.amazonaws.AmazonServiceException; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient; - -public enum AWSUtil { - INSTANCE; - private static final Logger LOG = Logger.getLogger(AWSUtil.class); - - public String getAwsUserName(String accessKey, String secretKey) { - String username = null; - AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey); - AmazonIdentityManagementClient amazonIdentityManagementClient; - if (awsCredentials != null) { - amazonIdentityManagementClient = new AmazonIdentityManagementClient( - awsCredentials); - } else { - // create default client - amazonIdentityManagementClient = new AmazonIdentityManagementClient(); - } - try { - username = amazonIdentityManagementClient.getUser().getUser() - .getUserName(); - } catch (AmazonServiceException e) { - if (e.getErrorCode().compareTo("AccessDenied") == 0) { - String arn = null; - String msg = e.getMessage(); - int arnIdx = msg.indexOf("arn:aws"); - if (arnIdx != -1) { - int arnSpace = msg.indexOf(" ", arnIdx); - // should be similar to "arn:aws:iam::111111111111:user/username" - arn = msg.substring(arnIdx, arnSpace); - } - if (arn != null) { - String[] arnParts = arn.split(":"); - if (arnParts != null && arnParts.length > 5) { - username = arnParts[5]; - if (username != null) { - username = username.replace("user/", ""); - } - } - } - } - } catch (Exception exception) { - LOG.error( - "Error in getting username :" + exception.getLocalizedMessage(), - exception.getCause()); - } - return username; - } - - public AWSCredentials createAWSCredentials(String accessKey, String secretKey) { - if (accessKey != null && secretKey != null) { - LOG.debug("Creating aws client as per new accesskey and secretkey"); - AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, - secretKey); - return awsCredentials; - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java deleted file mode 100644 index db187be..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.s3; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.zip.GZIPInputStream; - -import org.apache.commons.io.IOUtils; -import org.apache.log4j.Logger; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.Upload; - -/** - * Utility to connect to s3 - */ -public class S3Util { - public static final S3Util INSTANCE = new S3Util(); - - private static final Logger LOG = Logger.getLogger(S3Util.class); - - public static final String S3_PATH_START_WITH = "s3://"; - public static final String S3_PATH_SEPARATOR = "/"; - - public AmazonS3 getS3Client(String accessKey, String secretKey) { - AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( - accessKey, secretKey); - AmazonS3 s3client; - if (awsCredentials != null) { - s3client = new AmazonS3Client(awsCredentials); - } else { - s3client = new AmazonS3Client(); - } - return s3client; - } - - public TransferManager getTransferManager(String accessKey, String secretKey) { - AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( - accessKey, secretKey); - TransferManager transferManager; - if (awsCredentials != null) { - transferManager = new TransferManager(awsCredentials); - } else { - transferManager = new TransferManager(); - } - return transferManager; - } - - public void shutdownTransferManager(TransferManager transferManager) { - if (transferManager != null) { - transferManager.shutdownNow(); - } - } - - public String getBucketName(String s3Path) { - String bucketName = null; - // s3path - if (s3Path != null) { - String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split( - S3_PATH_SEPARATOR); - bucketName = s3PathParts[0]; - } - return bucketName; - } - - public String getS3Key(String s3Path) { - StringBuilder s3Key = new StringBuilder(); - // s3path - if (s3Path != null) { - String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split( - S3_PATH_SEPARATOR); - ArrayList<String> s3PathList = new ArrayList<String>( - Arrays.asList(s3PathParts)); - s3PathList.remove(0);// remove bucketName - for (int index = 0; index < s3PathList.size(); index++) { - if (index > 0) { - s3Key.append(S3_PATH_SEPARATOR); - } - s3Key.append(s3PathList.get(index)); - } - } - return s3Key.toString(); - } - - public void uploadFileTos3(String bucketName, String s3Key, File localFile, - String accessKey, String secretKey) { - TransferManager transferManager = getTransferManager(accessKey, secretKey); - try { - Upload upload = transferManager.upload(bucketName, s3Key, localFile); - upload.waitForUploadResult(); - } catch (AmazonClientException | InterruptedException e) { - LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), - e); - } finally { - shutdownTransferManager(transferManager); - } - } - - /** - * Get the buffer reader to read s3 file as a stream - */ - public BufferedReader getReader(String s3Path, String accessKey, - String secretKey) throws IOException { - // TODO error handling - // Compression support - // read header and decide the compression(auto detection) - // For now hard-code GZIP compression - String s3Bucket = getBucketName(s3Path); - String s3Key = getS3Key(s3Path); - S3Object fileObj = getS3Client(accessKey, secretKey).getObject( - new GetObjectRequest(s3Bucket, s3Key)); - GZIPInputStream objectInputStream; - try { - objectInputStream = new GZIPInputStream(fileObj.getObjectContent()); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader( - objectInputStream)); - return bufferedReader; - } catch (IOException e) { - LOG.error("Error in creating stream reader for s3 file :" + s3Path, - e.getCause()); - throw e; - } - } - - public void writeIntoS3File(String data, String bucketName, String s3Key, - String accessKey, String secretKey) { - InputStream in = null; - try { - in = IOUtils.toInputStream(data, "UTF-8"); - } catch (IOException e) { - LOG.error(e); - } - if (in != null) { - TransferManager transferManager = getTransferManager(accessKey, secretKey); - try { - if (transferManager != null) { - transferManager.upload( - new PutObjectRequest(bucketName, s3Key, in, - new ObjectMetadata())).waitForUploadResult(); - LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" - + bucketName); - } - } catch (AmazonClientException | InterruptedException e) { - LOG.error(e); - } finally { - try { - shutdownTransferManager(transferManager); - in.close(); - } catch (IOException e) { - // ignore - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java new file mode 100644 index 0000000..15f7594 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import org.apache.log4j.Logger; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient; + +public enum AWSUtil { + INSTANCE; + private static final Logger LOG = Logger.getLogger(AWSUtil.class); + + public String getAwsUserName(String accessKey, String secretKey) { + String username = null; + AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey); + AmazonIdentityManagementClient amazonIdentityManagementClient; + if (awsCredentials != null) { + amazonIdentityManagementClient = new AmazonIdentityManagementClient( + awsCredentials); + } else { + // create default client + amazonIdentityManagementClient = new AmazonIdentityManagementClient(); + } + try { + username = amazonIdentityManagementClient.getUser().getUser() + .getUserName(); + } catch (AmazonServiceException e) { + if (e.getErrorCode().compareTo("AccessDenied") == 0) { + String arn = null; + String msg = e.getMessage(); + int arnIdx = msg.indexOf("arn:aws"); + if (arnIdx != -1) { + int arnSpace = msg.indexOf(" ", arnIdx); + // should be similar to "arn:aws:iam::111111111111:user/username" + arn = msg.substring(arnIdx, arnSpace); + } + if (arn != null) { + String[] arnParts = arn.split(":"); + if (arnParts != null && arnParts.length > 5) { + username = arnParts[5]; + if (username != null) { + username = username.replace("user/", ""); + } + } + } + } + } catch (Exception exception) { + LOG.error( + "Error in getting username :" + exception.getLocalizedMessage(), + exception.getCause()); + } + return username; + } + + public AWSCredentials createAWSCredentials(String accessKey, String secretKey) { + if (accessKey != null && secretKey != null) { + LOG.debug("Creating aws client as per new accesskey and secretkey"); + AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, + secretKey); + return awsCredentials; + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java new file mode 100644 index 0000000..a92ba29 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.io.File; +import java.util.HashMap; + +import org.apache.log4j.Logger; + +public class AliasUtil { + + private static Logger logger = Logger.getLogger(AliasUtil.class); + + private static AliasUtil instance = null; + + private static String aliasConfigJson = "alias_config.json"; + + private HashMap<String, Object> aliasMap = null; + + public static enum ALIAS_TYPE { + INPUT, FILTER, MAPPER, OUTPUT + } + + public static enum ALIAS_PARAM { + KLASS + } + + private AliasUtil() { + init(); + } + + public static AliasUtil getInstance() { + if (instance == null) { + synchronized (AliasUtil.class) { + if (instance == null) { + instance = new AliasUtil(); + } + } + } + return instance; + } + + /** + */ + private void init() { + File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson); + if (jsonFile != null) { + this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile); + } + + } + + + public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) { + String result = key;// key as a default value; + HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype); + String value = aliasInfo.get(aliasParam.name().toLowerCase()); + if (value != null && !value.isEmpty()) { + result = value; + logger.debug("Alias found for key :" + key + ", param :" + aliasParam.name().toLowerCase() + ", value :" + + value + " aliastype:" + aliastype.name()); + } else { + logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase()); + } + return result; + } + + @SuppressWarnings("unchecked") + private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) { + HashMap<String, String> aliasInfo = null; + if (aliasMap != null) { + String typeKey = aliastype.name().toLowerCase(); + HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey); + if (typeJson != null) { + aliasInfo = (HashMap<String, String>) typeJson.get(key); + } + } + if (aliasInfo == null) { + aliasInfo = new HashMap<String, String>(); + } + return aliasInfo; + } +}
