AMBARI-18236. Fix package structure in Logfeeder (Miklos Gergely via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/080c1ba9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/080c1ba9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/080c1ba9

Branch: refs/heads/branch-dev-logsearch
Commit: 080c1ba9cee8257478ad9bc1778d35f7fd400c49
Parents: ffcf532
Author: Miklos Gergely <[email protected]>
Authored: Tue Aug 23 17:55:15 2016 +0200
Committer: oleewere <[email protected]>
Committed: Tue Aug 23 17:55:15 2016 +0200

----------------------------------------------------------------------
 .../org/apache/ambari/logfeeder/AliasUtil.java  |  99 ----
 .../apache/ambari/logfeeder/ConfigBlock.java    | 260 ---------
 .../org/apache/ambari/logfeeder/InputMgr.java   | 451 ---------------
 .../org/apache/ambari/logfeeder/LogFeeder.java  |  10 +-
 .../ambari/logfeeder/LogFeederAMSClient.java    |  80 ---
 .../apache/ambari/logfeeder/LogFeederUtil.java  | 556 ------------------
 .../apache/ambari/logfeeder/MetricCount.java    |  31 --
 .../org/apache/ambari/logfeeder/MetricsMgr.java | 177 ------
 .../org/apache/ambari/logfeeder/MurmurHash.java | 163 ------
 .../org/apache/ambari/logfeeder/OutputMgr.java  | 262 ---------
 .../ambari/logfeeder/common/ConfigBlock.java    | 263 +++++++++
 .../logfeeder/common/LogfeederException.java    |  31 ++
 .../logfeeder/exception/LogfeederException.java |  31 --
 .../apache/ambari/logfeeder/filter/Filter.java  |  16 +-
 .../ambari/logfeeder/filter/FilterGrok.java     |   6 +-
 .../ambari/logfeeder/filter/FilterJSON.java     |   4 +-
 .../ambari/logfeeder/filter/FilterKeyValue.java |   6 +-
 .../apache/ambari/logfeeder/input/Input.java    |   9 +-
 .../ambari/logfeeder/input/InputFile.java       |   2 +-
 .../apache/ambari/logfeeder/input/InputMgr.java | 451 +++++++++++++++
 .../ambari/logfeeder/input/InputS3File.java     |   4 +-
 .../ambari/logfeeder/input/InputSimulate.java   |   2 +-
 .../logconfig/FetchConfigFromSolr.java          |   2 +-
 .../logfeeder/logconfig/LogfeederScheduler.java |   2 +-
 .../logconfig/filter/ApplyLogFilter.java        |   2 +-
 .../logconfig/filter/FilterLogData.java         |   2 +-
 .../ambari/logfeeder/mapper/MapperDate.java     |   2 +-
 .../logfeeder/mapper/MapperFieldName.java       |   2 +-
 .../logfeeder/mapper/MapperFieldValue.java      |   2 +-
 .../logfeeder/metrics/LogFeederAMSClient.java   |  81 +++
 .../ambari/logfeeder/metrics/MetricCount.java   |  31 ++
 .../ambari/logfeeder/metrics/MetricsMgr.java    | 178 ++++++
 .../apache/ambari/logfeeder/output/Output.java  |   6 +-
 .../ambari/logfeeder/output/OutputFile.java     |   2 +-
 .../ambari/logfeeder/output/OutputHDFSFile.java |   2 +-
 .../ambari/logfeeder/output/OutputKafka.java    |   2 +-
 .../ambari/logfeeder/output/OutputMgr.java      | 263 +++++++++
 .../ambari/logfeeder/output/OutputS3File.java   |   4 +-
 .../ambari/logfeeder/output/OutputSolr.java     |   2 +-
 .../logfeeder/output/S3LogPathResolver.java     |   4 +-
 .../logfeeder/output/S3OutputConfiguration.java |   4 +-
 .../ambari/logfeeder/output/S3Uploader.java     |   4 +-
 .../org/apache/ambari/logfeeder/s3/AWSUtil.java |  84 ---
 .../org/apache/ambari/logfeeder/s3/S3Util.java  | 186 -------
 .../apache/ambari/logfeeder/util/AWSUtil.java   |  84 +++
 .../apache/ambari/logfeeder/util/AliasUtil.java |  99 ++++
 .../ambari/logfeeder/util/LogFeederUtil.java    | 557 +++++++++++++++++++
 .../ambari/logfeeder/util/MurmurHash.java       | 163 ++++++
 .../apache/ambari/logfeeder/util/S3Util.java    | 186 +++++++
 .../apache/ambari/logfeeder/util/SolrUtil.java  |   1 -
 .../ambari/logfeeder/filter/FilterGrokTest.java |   2 +-
 .../ambari/logfeeder/filter/FilterJSONTest.java |   6 +-
 .../logfeeder/filter/FilterKeyValueTest.java    |   2 +-
 .../ambari/logfeeder/input/InputFileTest.java   |   1 -
 .../ambari/logfeeder/mapper/MapperDateTest.java |   2 +-
 .../logfeeder/output/S3LogPathResolverTest.java |   3 +-
 .../ambari/logfeeder/output/S3UploaderTest.java |   2 +-
 .../apache/ambari/logfeeder/s3/AWSUtilTest.java |  27 -
 .../apache/ambari/logfeeder/s3/S3UtilTest.java  |  38 --
 .../ambari/logfeeder/util/AWSUtilTest.java      |  29 +
 .../ambari/logfeeder/util/S3UtilTest.java       |  40 ++
 61 files changed, 2519 insertions(+), 2504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
deleted file mode 100644
index 44bc829..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder;
-
-import java.io.File;
-import java.util.HashMap;
-
-import org.apache.log4j.Logger;
-
-public class AliasUtil {
-
-  private static Logger logger = Logger.getLogger(AliasUtil.class);
-
-  private static AliasUtil instance = null;
-
-  private static String aliasConfigJson = "alias_config.json";
-
-  private HashMap<String, Object> aliasMap = null;
-
-  public static enum ALIAS_TYPE {
-    INPUT, FILTER, MAPPER, OUTPUT
-  }
-
-  public static enum ALIAS_PARAM {
-    KLASS
-  }
-
-  private AliasUtil() {
-    init();
-  }
-
-  public static AliasUtil getInstance() {
-    if (instance == null) {
-      synchronized (AliasUtil.class) {
-        if (instance == null) {
-          instance = new AliasUtil();
-        }
-      }
-    }
-    return instance;
-  }
-
-  /**
-   */
-  private void init() {
-    File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson);
-    if (jsonFile != null) {
-      this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile);
-    }
-
-  }
-
-
-  public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM 
aliasParam) {
-    String result = key;// key as a default value;
-    HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
-    String value = aliasInfo.get(aliasParam.name().toLowerCase());
-    if (value != null && !value.isEmpty()) {
-      result = value;
-      logger.debug("Alias found for key :" + key + ",  param :" + 
aliasParam.name().toLowerCase() + ", value :"
-        + value + " aliastype:" + aliastype.name());
-    } else {
-      logger.debug("Alias not found for key :" + key + ", param :" + 
aliasParam.name().toLowerCase());
-    }
-    return result;
-  }
-
-  @SuppressWarnings("unchecked")
-  private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE 
aliastype) {
-    HashMap<String, String> aliasInfo = null;
-    if (aliasMap != null) {
-      String typeKey = aliastype.name().toLowerCase();
-      HashMap<String, Object> typeJson = (HashMap<String, Object>) 
aliasMap.get(typeKey);
-      if (typeJson != null) {
-        aliasInfo = (HashMap<String, String>) typeJson.get(key);
-      }
-    }
-    if (aliasInfo == null) {
-      aliasInfo = new HashMap<String, String>();
-    }
-    return aliasInfo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
deleted file mode 100644
index c3ccc47..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Priority;
-
-
-public abstract class ConfigBlock {
-  static private Logger logger = Logger.getLogger(ConfigBlock.class);
-
-  private boolean drain = false;
-
-  protected Map<String, Object> configs;
-  protected Map<String, String> contextFields = new HashMap<String, String>();
-  public MetricCount statMetric = new MetricCount();
-
-  /**
-   *
-   */
-  public ConfigBlock() {
-    super();
-  }
-
-  /**
-   * Used while logging. Keep it short and meaningful
-   */
-  public abstract String getShortDescription();
-
-  /**
-   * Every implementor need to give name to the thread they create
-   */
-  public String getNameForThread() {
-    return this.getClass().getSimpleName();
-  }
-
-  /**
-   * @param metricsList
-   */
-  public void addMetricsContainers(List<MetricCount> metricsList) {
-    metricsList.add(statMetric);
-  }
-
-  /**
-   * This method needs to be overwritten by deriving classes.
-   */
-  public void init() throws Exception {
-  }
-
-  public void loadConfig(Map<String, Object> map) {
-    configs = LogFeederUtil.cloneObject(map);
-
-    Map<String, String> nvList = getNVList("add_fields");
-    if (nvList != null) {
-      contextFields.putAll(nvList);
-    }
-  }
-
-  public Map<String, Object> getConfigs() {
-    return configs;
-  }
-
-  @SuppressWarnings("unchecked")
-  public boolean isEnabled() {
-    boolean isEnabled = getBooleanValue("is_enabled", true);
-    if (isEnabled) {
-      // Let's check for static conditions
-      Map<String, Object> conditions = (Map<String, Object>) configs
-        .get("conditions");
-      boolean allow = true;
-      if (conditions != null && conditions.size() > 0) {
-        allow = false;
-        for (String conditionType : conditions.keySet()) {
-          if (conditionType.equalsIgnoreCase("fields")) {
-            Map<String, Object> fields = (Map<String, Object>) conditions
-              .get("fields");
-            for (String fieldName : fields.keySet()) {
-              Object values = fields.get(fieldName);
-              if (values instanceof String) {
-                allow = isFieldConditionMatch(fieldName,
-                  (String) values);
-              } else {
-                List<String> listValues = (List<String>) values;
-                for (String stringValue : listValues) {
-                  allow = isFieldConditionMatch(fieldName,
-                    stringValue);
-                  if (allow) {
-                    break;
-                  }
-                }
-              }
-              if (allow) {
-                break;
-              }
-            }
-          }
-          if (allow) {
-            break;
-          }
-        }
-        isEnabled = allow;
-      }
-    }
-    return isEnabled;
-  }
-
-  public boolean isFieldConditionMatch(String fieldName, String stringValue) {
-    boolean allow = false;
-    String fieldValue = (String) configs.get(fieldName);
-    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
-      allow = true;
-    } else {
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) configs
-        .get("add_fields");
-      if (addFields != null && addFields.get(fieldName) != null) {
-        String addFieldValue = (String) addFields.get(fieldName);
-        if (stringValue.equalsIgnoreCase(addFieldValue)) {
-          allow = true;
-        }
-      }
-
-    }
-    return allow;
-  }
-
-  @SuppressWarnings("unchecked")
-  public Map<String, String> getNVList(String key) {
-    return (Map<String, String>) configs.get(key);
-  }
-
-  public String getStringValue(String key) {
-    Object value = configs.get(key);
-    if (value != null && value.toString().equalsIgnoreCase("none")) {
-      value = null;
-    }
-    if (value != null) {
-      return value.toString();
-    }
-    return null;
-  }
-
-  public String getStringValue(String key, String defaultValue) {
-    Object value = configs.get(key);
-    if (value != null && value.toString().equalsIgnoreCase("none")) {
-      value = null;
-    }
-
-    if (value != null) {
-      return value.toString();
-    }
-    return defaultValue;
-  }
-
-  public Object getConfigValue(String key) {
-    return configs.get(key);
-  }
-
-  public boolean getBooleanValue(String key, boolean defaultValue) {
-    String strValue = getStringValue(key);
-    boolean retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      if (strValue.equalsIgnoreCase("true")
-        || strValue.equalsIgnoreCase("yes")) {
-        retValue = true;
-      } else {
-        retValue = false;
-      }
-    }
-    return retValue;
-  }
-
-  public int getIntValue(String key, int defaultValue) {
-    String strValue = getStringValue(key);
-    int retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      try {
-        retValue = Integer.parseInt(strValue);
-      } catch (Throwable t) {
-        logger.error("Error parsing integer value. key=" + key
-          + ", value=" + strValue);
-      }
-    }
-    return retValue;
-  }
-  
-  public long getLongValue(String key, long defaultValue) {
-    String strValue = getStringValue(key);
-    Long retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      try {
-        retValue = Long.parseLong(strValue);
-      } catch (Throwable t) {
-        logger.error("Error parsing long value. key=" + key + ", value="
-            + strValue);
-      }
-    }
-    return retValue;
-  }
-
-  public Map<String, String> getContextFields() {
-    return contextFields;
-  }
-
-  public void incrementStat(int count) {
-    statMetric.count += count;
-  }
-
-  public void logStatForMetric(MetricCount metric, String prefixStr) {
-    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key="
-      + getShortDescription());
-  }
-
-  synchronized public void logStat() {
-    logStatForMetric(statMetric, "Stat");
-  }
-
-  public boolean logConfgs(Priority level) {
-    if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) {
-      return false;
-    }
-    if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) {
-      return false;
-    }
-    logger.log(level, "Printing configuration Block="
-      + getShortDescription());
-    logger.log(level, "configs=" + configs);
-    logger.log(level, "contextFields=" + contextFields);
-    return true;
-  }
-
-  public boolean isDrain() {
-    return drain;
-  }
-
-  public void setDrain(boolean drain) {
-    this.drain = drain;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
deleted file mode 100644
index fa60702..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputFile;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-public class InputMgr {
-  private static final Logger logger = Logger.getLogger(InputMgr.class);
-
-  private List<Input> inputList = new ArrayList<Input>();
-  private Set<Input> notReadyList = new HashSet<Input>();
-
-  private boolean isDrain = false;
-  private boolean isAnyInputTail = false;
-
-  private String checkPointSubFolderName = "logfeeder_checkpoints";
-  private File checkPointFolderFile = null;
-
-  private MetricCount filesCountMetric = new MetricCount();
-
-  private String checkPointExtension = ".cp";
-  
-  private Thread inputIsReadyMonitor = null;
-
-  public List<Input> getInputList() {
-    return inputList;
-  }
-
-  public void add(Input input) {
-    inputList.add(input);
-  }
-
-  public void removeInput(Input input) {
-    logger.info("Trying to remove from inputList. "
-      + input.getShortDescription());
-    Iterator<Input> iter = inputList.iterator();
-    while (iter.hasNext()) {
-      Input iterInput = iter.next();
-      if (iterInput.equals(input)) {
-        logger.info("Removing Input from inputList. "
-          + input.getShortDescription());
-        iter.remove();
-      }
-    }
-  }
-
-  public int getActiveFilesCount() {
-    int count = 0;
-    for (Input input : inputList) {
-      if (input.isReady()) {
-        count++;
-      }
-    }
-    return count;
-  }
-
-  public void init() {
-    filesCountMetric.metricsName = "input.files.count";
-    filesCountMetric.isPointInTime = true;
-
-    checkPointExtension = LogFeederUtil.getStringProperty(
-      "logfeeder.checkpoint.extension", checkPointExtension);
-    for (Input input : inputList) {
-      try {
-        input.init();
-        if (input.isTail()) {
-          isAnyInputTail = true;
-        }
-      } catch (Exception e) {
-        logger.error(
-          "Error initializing input. "
-            + input.getShortDescription(), e);
-      }
-    }
-
-    if (isAnyInputTail) {
-      logger.info("Determining valid checkpoint folder");
-      boolean isCheckPointFolderValid = false;
-      // We need to keep track of the files we are reading.
-      String checkPointFolder = LogFeederUtil
-        .getStringProperty("logfeeder.checkpoint.folder");
-      if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
-        checkPointFolderFile = new File(checkPointFolder);
-        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-      }
-      if (!isCheckPointFolderValid) {
-        // Let's try home folder
-        String userHome = LogFeederUtil.getStringProperty("user.home");
-        if (userHome != null) {
-          checkPointFolderFile = new File(userHome,
-            checkPointSubFolderName);
-          logger.info("Checking if home folder can be used for checkpoints. 
Folder="
-            + checkPointFolderFile);
-          isCheckPointFolderValid = 
verifyCheckPointFolder(checkPointFolderFile);
-        }
-      }
-      if (!isCheckPointFolderValid) {
-        // Let's use tmp folder
-        String tmpFolder = LogFeederUtil
-          .getStringProperty("java.io.tmpdir");
-        if (tmpFolder == null) {
-          tmpFolder = "/tmp";
-        }
-        checkPointFolderFile = new File(tmpFolder,
-          checkPointSubFolderName);
-        logger.info("Checking if tmps folder can be used for checkpoints. 
Folder="
-          + checkPointFolderFile);
-        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-        if (isCheckPointFolderValid) {
-          logger.warn("Using tmp folder "
-            + checkPointFolderFile
-            + " to store check points. This is not recommended."
-            + "Please set logfeeder.checkpoint.folder property");
-        }
-      }
-
-      if (isCheckPointFolderValid) {
-        logger.info("Using folder " + checkPointFolderFile
-          + " for storing checkpoints");
-      }
-    }
-
-  }
-
-  public File getCheckPointFolderFile() {
-    return checkPointFolderFile;
-  }
-
-  private boolean verifyCheckPointFolder(File folderPathFile) {
-    if (!folderPathFile.exists()) {
-      // Create the folder
-      try {
-        if (!folderPathFile.mkdir()) {
-          logger.warn("Error creating folder for check point. folder="
-            + folderPathFile);
-        }
-      } catch (Throwable t) {
-        logger.warn("Error creating folder for check point. folder="
-          + folderPathFile, t);
-      }
-    }
-
-    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
-      // Let's check whether we can create a file
-      File testFile = new File(folderPathFile, UUID.randomUUID()
-        .toString());
-      try {
-        testFile.createNewFile();
-        return testFile.delete();
-      } catch (IOException e) {
-        logger.warn(
-          "Couldn't create test file in "
-            + folderPathFile.getAbsolutePath()
-            + " for checkPoint", e);
-      }
-    }
-    return false;
-  }
-
-  public void monitor() {
-    for (Input input : inputList) {
-      if (input.isReady()) {
-        input.monitor();
-      } else {
-        if (input.isTail()) {
-          logger.info("Adding input to not ready list. Note, it is possible 
this component is not run on this host. So it might not be an issue. "
-            + input.getShortDescription());
-          notReadyList.add(input);
-        } else {
-          logger.info("Input is not ready, so going to ignore it "
-            + input.getShortDescription());
-        }
-      }
-    }
-    // Start the monitoring thread if any file is in tail mode
-    if (isAnyInputTail) {
-       inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
-        @Override
-        public void run() {
-          logger.info("Going to monitor for these missing files: "
-            + notReadyList.toString());
-          while (true) {
-            if (isDrain) {
-              logger.info("Exiting missing file monitor.");
-              break;
-            }
-            try {
-              Iterator<Input> iter = notReadyList.iterator();
-              while (iter.hasNext()) {
-                Input input = iter.next();
-                try {
-                  if (input.isReady()) {
-                    input.monitor();
-                    iter.remove();
-                  }
-                } catch (Throwable t) {
-                  logger.error("Error while enabling monitoring for input. "
-                    + input.getShortDescription());
-                }
-              }
-              Thread.sleep(30 * 1000);
-            } catch (Throwable t) {
-              // Ignore
-            }
-          }
-        }
-      };
-      inputIsReadyMonitor.start();
-    }
-  }
-
-  public void addToNotReady(Input notReadyInput) {
-    notReadyList.add(notReadyInput);
-  }
-
-  public void addMetricsContainers(List<MetricCount> metricsList) {
-    for (Input input : inputList) {
-      input.addMetricsContainers(metricsList);
-    }
-    filesCountMetric.count = getActiveFilesCount();
-    metricsList.add(filesCountMetric);
-  }
-
-  public void logStats() {
-    for (Input input : inputList) {
-      input.logStat();
-    }
-
-    filesCountMetric.count = getActiveFilesCount();
-    LogFeederUtil.logStatForMetric(filesCountMetric,
-      "Stat: Files Monitored Count", null);
-  }
-
-  public void close() {
-    for (Input input : inputList) {
-      try {
-        input.setDrain(true);
-      } catch (Throwable t) {
-        logger.error(
-          "Error while draining. input="
-            + input.getShortDescription(), t);
-      }
-    }
-    isDrain = true;
-
-    // Need to get this value from property
-    int iterations = 30;
-    int waitTimeMS = 1000;
-    int i = 0;
-    boolean allClosed = true;
-    for (i = 0; i < iterations; i++) {
-      allClosed = true;
-      for (Input input : inputList) {
-        if (!input.isClosed()) {
-          try {
-            allClosed = false;
-            logger.warn("Waiting for input to close. "
-              + input.getShortDescription() + ", "
-              + (iterations - i) + " more seconds");
-            Thread.sleep(waitTimeMS);
-          } catch (Throwable t) {
-            // Ignore
-          }
-        }
-      }
-      if (allClosed) {
-        break;
-      }
-    }
-    if (!allClosed) {
-      logger.warn("Some inputs were not closed. Iterations=" + i);
-      for (Input input : inputList) {
-        if (!input.isClosed()) {
-          logger.warn("Input not closed. Will ignore it."
-            + input.getShortDescription());
-        }
-      }
-    } else {
-      logger.info("All inputs are closed. Iterations=" + i);
-    }
-
-  }
-
-  public void checkInAll() {
-    for (Input input : inputList) {
-      input.checkIn();
-    }
-  }
-
-  public void cleanCheckPointFiles() {
-
-    if (checkPointFolderFile == null) {
-      logger.info("Will not clean checkPoint files. checkPointFolderFile="
-        + checkPointFolderFile);
-      return;
-    }
-    logger.info("Cleaning checkPoint files. checkPointFolderFile="
-      + checkPointFolderFile.getAbsolutePath());
-    try {
-      // Loop over the check point files and if filePath is not present, then 
move to closed
-      String searchPath = "*" + checkPointExtension;
-      FileFilter fileFilter = new WildcardFileFilter(searchPath);
-      File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
-      int totalCheckFilesDeleted = 0;
-      for (File checkPointFile : checkPointFiles) {
-        RandomAccessFile checkPointReader = null;
-        try {
-          checkPointReader = new RandomAccessFile(checkPointFile, "r");
-
-          int contentSize = checkPointReader.readInt();
-          byte b[] = new byte[contentSize];
-          int readSize = checkPointReader.read(b, 0, contentSize);
-          if (readSize != contentSize) {
-            logger.error("Couldn't read expected number of bytes from 
checkpoint file. expected="
-              + contentSize
-              + ", read="
-              + readSize
-              + ", checkPointFile=" + checkPointFile);
-          } else {
-            // Create JSON string
-            String jsonCheckPointStr = new String(b, 0, readSize);
-            Map<String, Object> jsonCheckPoint = LogFeederUtil
-              .toJSONObject(jsonCheckPointStr);
-
-            String logFilePath = (String) jsonCheckPoint
-              .get("file_path");
-            String logFileKey = (String) jsonCheckPoint
-              .get("file_key");
-            if (logFilePath != null && logFileKey != null) {
-              boolean deleteCheckPointFile = false;
-              File logFile = new File(logFilePath);
-              if (logFile.exists()) {
-                Object fileKeyObj = InputFile
-                  .getFileKey(logFile);
-                String fileBase64 = Base64
-                  .byteArrayToBase64(fileKeyObj
-                    .toString().getBytes());
-                if (!logFileKey.equals(fileBase64)) {
-                  deleteCheckPointFile = true;
-                  logger.info("CheckPoint clean: File key has changed. old="
-                    + logFileKey
-                    + ", new="
-                    + fileBase64
-                    + ", filePath="
-                    + logFilePath
-                    + ", checkPointFile="
-                    + checkPointFile.getAbsolutePath());
-                }
-              } else {
-                logger.info("CheckPoint clean: Log file doesn't exist. 
filePath="
-                  + logFilePath
-                  + ", checkPointFile="
-                  + checkPointFile.getAbsolutePath());
-                deleteCheckPointFile = true;
-              }
-              if (deleteCheckPointFile) {
-                logger.info("Deleting CheckPoint file="
-                  + checkPointFile.getAbsolutePath()
-                  + ", logFile=" + logFilePath);
-                checkPointFile.delete();
-                totalCheckFilesDeleted++;
-              }
-            }
-          }
-        } catch (EOFException eof) {
-          logger.warn("Caught EOFException. Ignoring reading existing 
checkPoint file. "
-            + checkPointFile);
-        } catch (Throwable t) {
-          logger.error("Error while checking checkPoint file. "
-            + checkPointFile, t);
-        } finally {
-          if (checkPointReader != null) {
-            try {
-              checkPointReader.close();
-            } catch (Throwable t) {
-              logger.error("Error closing checkPoint file. "
-                + checkPointFile, t);
-            }
-          }
-        }
-      }
-      logger.info("Deleted " + totalCheckFilesDeleted
-        + " checkPoint file(s). checkPointFolderFile="
-        + checkPointFolderFile.getAbsolutePath());
-
-    } catch (Throwable t) {
-      logger.error("Error while cleaning checkPointFiles", t);
-    }
-  }
-
-  public void waitOnAllInputs() {
-    //wait on inputs
-    if (inputList != null) {
-      for (Input input : inputList) {
-        if (input != null) {
-          Thread inputThread = input.getThread();
-          if (inputThread != null) {
-            try {
-              inputThread.join();
-            } catch (InterruptedException e) {
-              // ignore
-            }
-          }
-        }
-      }
-    }
-    // wait on monitor
-    if (inputIsReadyMonitor != null) {
-      try {
-        this.close();
-        inputIsReadyMonitor.join();
-      } catch (InterruptedException e) {
-        // ignore
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 3cf0fff..373d743 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -37,14 +37,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMgr;
 import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricsMgr;
 import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.util.AliasUtil;
 import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
deleted file mode 100644
index da61d83..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.log4j.Logger;
-
-// TODO: Refactor for failover
-public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
-  private static final Logger logger = 
Logger.getLogger(LogFeederAMSClient.class);
-
-  private String collectorHosts = null;
-
-  public LogFeederAMSClient() {
-    collectorHosts = LogFeederUtil
-      .getStringProperty("logfeeder.metrics.collector.hosts");
-    if (collectorHosts != null && collectorHosts.trim().length() == 0) {
-      collectorHosts = null;
-    }
-    if (collectorHosts != null) {
-      collectorHosts = collectorHosts.trim();
-    }
-    logger.info("AMS collector URL=" + collectorHosts);
-  }
-
-  @Override
-  public String getCollectorUri(String host) {
-    return collectorHosts;
-  }
-
-  @Override
-  protected int getTimeoutSeconds() {
-    // TODO: Hard coded timeout
-    return 10;
-  }
-
-  @Override
-  protected String getZookeeperQuorum() {
-    return null;
-  }
-
-  @Override
-  protected String getConfiguredCollectors() {
-    return null;
-  }
-
-  @Override
-  protected String getHostname() {
-    return null;
-  }
-
-  @Override
-  protected boolean emitMetrics(TimelineMetrics metrics) {
-    return super.emitMetrics(metrics);
-  }
-
-  @Override
-  protected String getCollectorProtocol() {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
deleted file mode 100644
index a86d989..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
+++ /dev/null
@@ -1,556 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.TimeZone;
-
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-import com.google.common.collect.ObjectArrays;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * This class contains utility methods used by LogFeeder
- */
-public class LogFeederUtil {
-  private static final Logger logger = Logger.getLogger(LogFeederUtil.class);
-
-  private static final int HASH_SEED = 31174077;
-  public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
-  public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-  private static Gson gson = new 
GsonBuilder().setDateFormat(DATE_FORMAT).create();
-
-  private static Properties props;
-
-  private static Map<String, LogHistory> logHistoryList = new 
Hashtable<String, LogHistory>();
-  private static int logInterval = 30000; // 30 seconds
-
-  public static String hostName = null;
-  public static String ipAddress = null;
-  
-  private static String logfeederTempDir = null;
-  
-  private static final Object _LOCK = new Object();
-  
-  static{
-    setHostNameAndIP();
-  }
-  
-  public static Gson getGson() {
-    return gson;
-  }
-
-  private static ThreadLocal<SimpleDateFormat> dateFormatter = new 
ThreadLocal<SimpleDateFormat>() {
-    @Override
-    protected SimpleDateFormat initialValue() {
-      SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
-      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-      return sdf;
-    }
-  };
-
-  /**
-   * This method will read the properties from System, followed by propFile
-   * and finally from the map
-   */
-  public static void loadProperties(String propFile, String[] propNVList)
-    throws Exception {
-    logger.info("Loading properties. propFile=" + propFile);
-    props = new Properties(System.getProperties());
-    boolean propLoaded = false;
-
-    // First get properties file path from environment value
-    String propertiesFilePath = System.getProperty("properties");
-    if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
-      File propertiesFile = new File(propertiesFilePath);
-      if (propertiesFile.exists() && propertiesFile.isFile()) {
-        logger.info("Properties file path set in environment. Loading 
properties file="
-          + propertiesFilePath);
-        FileInputStream fileInputStream = null;
-        try {
-          fileInputStream = new FileInputStream(propertiesFile);
-          props.load(fileInputStream);
-          propLoaded = true;
-        } catch (Throwable t) {
-          logger.error("Error loading properties file. properties file="
-            + propertiesFile.getAbsolutePath());
-        } finally {
-          if (fileInputStream != null) {
-            try {
-              fileInputStream.close();
-            } catch (Throwable t) {
-              // Ignore error
-            }
-          }
-        }
-      } else {
-        logger.error("Properties file path set in environment, but file not 
found. properties file="
-          + propertiesFilePath);
-      }
-    }
-
-    if (!propLoaded) {
-      BufferedInputStream fileInputStream = null;
-      try {
-        // Properties not yet loaded, let's try from class loader
-        fileInputStream = (BufferedInputStream) LogFeeder.class
-          .getClassLoader().getResourceAsStream(propFile);
-        if (fileInputStream != null) {
-          logger.info("Loading properties file " + propFile
-            + " from classpath");
-          props.load(fileInputStream);
-          propLoaded = true;
-        } else {
-          logger.fatal("Properties file not found in classpath. properties 
file name= "
-            + propFile);
-        }
-      } finally {
-        if (fileInputStream != null) {
-          try {
-            fileInputStream.close();
-          } catch (IOException e) {
-          }
-        }
-      }
-    }
-
-    if (!propLoaded) {
-      logger.fatal("Properties file is not loaded.");
-      throw new Exception("Properties not loaded");
-    } else {
-      updatePropertiesFromMap(propNVList);
-    }
-  }
-
-  private static void updatePropertiesFromMap(String[] nvList) {
-    if (nvList == null) {
-      return;
-    }
-    logger.info("Trying to load additional proeprties from argument paramters. 
nvList.length="
-      + nvList.length);
-    if (nvList != null && nvList.length > 0) {
-      for (String nv : nvList) {
-        logger.info("Passed nv=" + nv);
-        if (nv.startsWith("-") && nv.length() > 1) {
-          nv = nv.substring(1);
-          logger.info("Stripped nv=" + nv);
-          int i = nv.indexOf("=");
-          if (nv.length() > i) {
-            logger.info("Candidate nv=" + nv);
-            String name = nv.substring(0, i);
-            String value = nv.substring(i + 1);
-            logger.info("Adding property from argument to properties. name="
-              + name + ", value=" + value);
-            props.put(name, value);
-          }
-        }
-      }
-    }
-  }
-
-  static public String getStringProperty(String key) {
-    if (props != null) {
-      return props.getProperty(key);
-    }
-    return null;
-  }
-
-  static public String getStringProperty(String key, String defaultValue) {
-    if (props != null) {
-      return props.getProperty(key, defaultValue);
-    }
-    return defaultValue;
-  }
-
-  static public boolean getBooleanProperty(String key, boolean defaultValue) {
-    String strValue = getStringProperty(key);
-    return toBoolean(strValue, defaultValue);
-  }
-
-  private static boolean toBoolean(String strValue, boolean defaultValue) {
-    boolean retValue = defaultValue;
-    if (!StringUtils.isEmpty(strValue)) {
-      if (strValue.equalsIgnoreCase("true")
-        || strValue.equalsIgnoreCase("yes")) {
-        retValue = true;
-      } else {
-        retValue = false;
-      }
-    }
-    return retValue;
-  }
-
-  static public int getIntProperty(String key, int defaultValue) {
-    String strValue = getStringProperty(key);
-    int retValue = defaultValue;
-    retValue = objectToInt(strValue, retValue, ", key=" + key);
-    return retValue;
-  }
-
-  public static int objectToInt(Object objValue, int retValue,
-                                String errMessage) {
-    if (objValue == null) {
-      return retValue;
-    }
-    String strValue = objValue.toString();
-    if (!StringUtils.isEmpty(strValue)) {
-      try {
-        retValue = Integer.parseInt(strValue);
-      } catch (Throwable t) {
-        logger.error("Error parsing integer value. str=" + strValue
-          + ", " + errMessage);
-      }
-    }
-    return retValue;
-  }
-
-  public static boolean isEnabled(Map<String, Object> conditionConfigs,
-                                  Map<String, Object> valueConfigs) {
-    boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
-    @SuppressWarnings("unchecked")
-    Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
-      .get("conditions");
-    if (conditions != null && conditions.size() > 0) {
-      allow = false;
-      for (String conditionType : conditions.keySet()) {
-        if (conditionType.equalsIgnoreCase("fields")) {
-          @SuppressWarnings("unchecked")
-          Map<String, Object> fields = (Map<String, Object>) conditions
-            .get("fields");
-          for (String fieldName : fields.keySet()) {
-            Object values = fields.get(fieldName);
-            if (values instanceof String) {
-              allow = isFieldConditionMatch(valueConfigs,
-                fieldName, (String) values);
-            } else {
-              @SuppressWarnings("unchecked")
-              List<String> listValues = (List<String>) values;
-              for (String stringValue : listValues) {
-                allow = isFieldConditionMatch(valueConfigs,
-                  fieldName, stringValue);
-                if (allow) {
-                  break;
-                }
-              }
-            }
-            if (allow) {
-              break;
-            }
-          }
-        }
-        if (allow) {
-          break;
-        }
-      }
-    }
-    return allow;
-  }
-
-  public static boolean isFieldConditionMatch(Map<String, Object> configs,
-                                              String fieldName, String 
stringValue) {
-    boolean allow = false;
-    String fieldValue = (String) configs.get(fieldName);
-    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
-      allow = true;
-    } else {
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) configs
-        .get("add_fields");
-      if (addFields != null && addFields.get(fieldName) != null) {
-        String addFieldValue = (String) addFields.get(fieldName);
-        if (stringValue.equalsIgnoreCase(addFieldValue)) {
-          allow = true;
-        }
-      }
-
-    }
-    return allow;
-  }
-
-  public static void logStatForMetric(MetricCount metric, String prefixStr,
-                                      String postFix) {
-    long currStat = metric.count;
-    long currMS = System.currentTimeMillis();
-    if (currStat > metric.prevLogCount) {
-      if (postFix == null) {
-        postFix = "";
-      }
-      logger.info(prefixStr + ": total_count=" + metric.count
-        + ", duration=" + (currMS - metric.prevLogMS) / 1000
-        + " secs, count=" + (currStat - metric.prevLogCount)
-        + postFix);
-    }
-    metric.prevLogCount = currStat;
-    metric.prevLogMS = currMS;
-  }
-
-  public static Map<String, Object> cloneObject(Map<String, Object> map) {
-    if (map == null) {
-      return null;
-    }
-    String jsonStr = gson.toJson(map);
-    Type type = new TypeToken<Map<String, Object>>() {
-    }.getType();
-    return gson.fromJson(jsonStr, type);
-  }
-
-  public static Map<String, Object> toJSONObject(String jsonStr) {
-    if(jsonStr==null || jsonStr.trim().isEmpty()){
-      return new HashMap<String, Object>();
-    }
-    Type type = new TypeToken<Map<String, Object>>() {
-    }.getType();
-    return gson.fromJson(jsonStr, type);
-  }
-
-  static public boolean logErrorMessageByInterval(String key, String message,
-                                                  Throwable e, Logger 
callerLogger, Level level) {
-
-    LogHistory log = logHistoryList.get(key);
-    if (log == null) {
-      log = new LogHistory();
-      logHistoryList.put(key, log);
-    }
-    if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
-      log.lastLogTime = System.currentTimeMillis();
-      int counter = log.counter;
-      log.counter = 0;
-      if (counter > 0) {
-        message += ". Messages suppressed before: " + counter;
-      }
-      if (e == null) {
-        callerLogger.log(level, message);
-      } else {
-        callerLogger.log(level, message, e);
-      }
-
-      return true;
-    } else {
-      log.counter++;
-    }
-    return false;
-
-  }
-
-  static public String subString(String str, int maxLength) {
-    if (str == null || str.length() == 0) {
-      return "";
-    }
-    maxLength = str.length() < maxLength ? str.length() : maxLength;
-    return str.substring(0, maxLength);
-  }
-
-  public static long genHash(String value) {
-    if (value == null) {
-      value = "null";
-    }
-    return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
-  }
-
-  private static class LogHistory {
-    private long lastLogTime = 0;
-    private int counter = 0;
-  }
-
-  public static String getDate(String timeStampStr) {
-    try {
-      return dateFormatter.get().format(new 
Date(Long.parseLong(timeStampStr)));
-    } catch (Exception ex) {
-      logger.error(ex);
-      return null;
-    }
-  }
-
-  public static String getActualDateStr() {
-    try {
-      return dateFormatter.get().format(new Date());
-    } catch (Exception ex) {
-      logger.error(ex);
-      return null;
-    }
-  }
-
-  public static File getFileFromClasspath(String filename) {
-    URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
-      .getResource(filename);
-    logger.debug("File Complete URI :" + fileCompleteUrl);
-    File file = null;
-    try {
-      file = new File(fileCompleteUrl.toURI());
-    } catch (Exception exception) {
-      logger.debug(exception.getMessage(), exception.getCause());
-    }
-    return file;
-  }
-
-  public static Object getClassInstance(String classFullName, 
AliasUtil.ALIAS_TYPE aliasType) {
-    Object instance = null;
-    try {
-      instance = (Object) 
Class.forName(classFullName).getConstructor().newInstance();
-    } catch (Exception exception) {
-      logger.error("Unsupported class =" + classFullName, 
exception.getCause());
-    }
-    // check instance class as par aliasType
-    if (instance != null) {
-      boolean isValid = false;
-      switch (aliasType) {
-        case FILTER:
-          isValid = Filter.class.isAssignableFrom(instance.getClass());
-          break;
-        case INPUT:
-          isValid = Input.class.isAssignableFrom(instance.getClass());
-          break;
-        case OUTPUT:
-          isValid = Output.class.isAssignableFrom(instance.getClass());
-          break;
-        case MAPPER:
-          isValid = Mapper.class.isAssignableFrom(instance.getClass());
-          break;
-        default:
-          // by default consider all are valid class
-          isValid = true;
-      }
-      if (!isValid) {
-        logger.error("Not a valid class :" + classFullName + " AliasType :" + 
aliasType.name());
-      }
-    }
-    return instance;
-  }
-
-  public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new 
TypeReference<HashMap<String, Object>>() {
-      });
-      return jsonmap;
-    } catch (JsonParseException e) {
-      logger.error(e, e.getCause());
-    } catch (JsonMappingException e) {
-      logger.error(e, e.getCause());
-    } catch (IOException e) {
-      logger.error(e, e.getCause());
-    }
-    return new HashMap<String, Object>();
-  }
-
-  public static boolean isListContains(List<String> list, String str, boolean 
caseSensitive) {
-    if (list != null) {
-      for (String value : list) {
-        if (value != null) {
-          if (caseSensitive) {
-            if (value.equals(str)) {
-              return true;
-            }
-          } else {
-            if (value.equalsIgnoreCase(str)) {
-              return true;
-            }
-          }
-          if (value.equalsIgnoreCase(LogFeederConstants.ALL)) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
-  }
-  
-  
-  private static synchronized String setHostNameAndIP() {
-    if (hostName == null || ipAddress == null) {
-      try {
-        InetAddress ip = InetAddress.getLocalHost();
-        ipAddress = ip.getHostAddress();
-        String getHostName = ip.getHostName();
-        String getCanonicalHostName = ip.getCanonicalHostName();
-        if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
-          logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
-          hostName = getCanonicalHostName;
-        } else {
-          logger.info("Using getHostName()=" + getHostName);
-          hostName = getHostName;
-        }
-        logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
-            + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
-            + hostName);
-      } catch (UnknownHostException e) {
-        logger.error("Error getting hostname.", e);
-      }
-    }
-    return hostName;
-  }
-
-  public static String[] mergeArray(String[] first, String[] second) {
-    if (first == null) {
-      first = new String[0];
-    }
-    if (second == null) {
-      second = new String[0];
-    }
-    String[] mergedArray = ObjectArrays.concat(first, second, String.class);
-    return mergedArray;
-  }
-  
-  public static String getLogfeederTempDir() {
-    if (logfeederTempDir == null) {
-      synchronized (_LOCK) {
-        if (logfeederTempDir == null) {
-          String tempDirValue = getStringProperty("logfeeder.tmp.dir",
-              "/tmp/$username/logfeeder/");
-          HashMap<String, String> contextParam = new HashMap<String, String>();
-          String username = System.getProperty("user.name");
-          contextParam.put("username", username);
-          logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
-              contextParam);
-        }
-      }
-    }
-    return logfeederTempDir;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
deleted file mode 100644
index 9bb1564..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-public class MetricCount {
-  public String metricsName = null;
-  public boolean isPointInTime = false;
-
-  public long count = 0;
-  public long prevLogCount = 0;
-  public long prevLogMS = System.currentTimeMillis();
-  public long prevPublishCount = 0;
-  public int publishCount = 0; // Count of published metrics. Used for first 
time sending metrics
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
deleted file mode 100644
index b2a7786..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.log4j.Logger;
-
-public class MetricsMgr {
-  private static final Logger logger = Logger.getLogger(MetricsMgr.class);
-
-  private boolean isMetricsEnabled = false;
-  private String nodeHostName = null;
-  private String appId = "logfeeder";
-
-  private long lastPublishTimeMS = 0; // Let's do the first publish immediately
-  private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset 
the clock
-
-  private int publishIntervalMS = 60 * 1000;
-  private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should 
not keep
-  // the metrics in memory forever
-  private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, 
TimelineMetric>();
-  private LogFeederAMSClient amsClient = null;
-
-  public void init() {
-    logger.info("Initializing MetricsMgr()");
-    amsClient = new LogFeederAMSClient();
-
-    if (amsClient.getCollectorUri(null) != null) {
-      nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
-      if (nodeHostName == null) {
-        try {
-          nodeHostName = InetAddress.getLocalHost().getHostName();
-        } catch (Throwable e) {
-          logger.warn(
-            "Error getting hostname using 
InetAddress.getLocalHost().getHostName()",
-            e);
-        }
-        if (nodeHostName == null) {
-          try {
-            nodeHostName = InetAddress.getLocalHost()
-              .getCanonicalHostName();
-          } catch (Throwable e) {
-            logger.warn(
-              "Error getting hostname using 
InetAddress.getLocalHost().getCanonicalHostName()",
-              e);
-          }
-        }
-      }
-      if (nodeHostName == null) {
-        isMetricsEnabled = false;
-        logger.error("Failed getting hostname for node. Disabling publishing 
LogFeeder metrics");
-      } else {
-        isMetricsEnabled = true;
-        logger.info("LogFeeder Metrics is enabled. Metrics host="
-          + amsClient.getCollectorUri(null));
-      }
-    } else {
-      logger.info("LogFeeder Metrics publish is disabled");
-    }
-  }
-
-  public boolean isMetricsEnabled() {
-    return isMetricsEnabled;
-  }
-
-  synchronized public void useMetrics(List<MetricCount> metricsList) {
-    if (!isMetricsEnabled) {
-      return;
-    }
-    logger.info("useMetrics() metrics.size=" + metricsList.size());
-    long currMS = System.currentTimeMillis();
-    Long currMSLong = new Long(currMS);
-    for (MetricCount metric : metricsList) {
-      if (metric.metricsName == null) {
-        logger.debug("metric.metricsName is null");
-        // Metrics is not meant to be published
-        continue;
-      }
-      long currCount = metric.count;
-      if (!metric.isPointInTime && metric.publishCount > 0
-        && currCount <= metric.prevPublishCount) {
-        // No new data added, so let's ignore it
-        logger.debug("Nothing changed. " + metric.metricsName
-          + ", currCount=" + currCount + ", prevPublishCount="
-          + metric.prevPublishCount);
-        continue;
-      }
-      metric.publishCount++;
-
-      TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
-      if (timelineMetric == null) {
-        logger.debug("Creating new metric obbject for "
-          + metric.metricsName);
-        // First time for this metric
-        timelineMetric = new TimelineMetric();
-        timelineMetric.setMetricName(metric.metricsName);
-        timelineMetric.setHostName(nodeHostName);
-        timelineMetric.setAppId(appId);
-        timelineMetric.setStartTime(currMS);
-        timelineMetric.setType("Long");
-        timelineMetric.setMetricValues(new TreeMap<Long, Double>());
-
-        metricsMap.put(metric.metricsName, timelineMetric);
-      }
-      logger.debug("Adding metrics=" + metric.metricsName);
-      if (metric.isPointInTime) {
-        timelineMetric.getMetricValues().put(currMSLong,
-          new Double(currCount));
-      } else {
-        Double value = timelineMetric.getMetricValues().get(currMSLong);
-        if (value == null) {
-          value = new Double(0);
-        }
-        value += (currCount - metric.prevPublishCount);
-        timelineMetric.getMetricValues().put(currMSLong, value);
-        metric.prevPublishCount = currCount;
-      }
-    }
-
-    if (metricsMap.size() > 0
-      && currMS - lastPublishTimeMS > publishIntervalMS) {
-      try {
-        // Time to publish
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        List<TimelineMetric> timeLineMetricList = new 
ArrayList<TimelineMetric>();
-        timeLineMetricList.addAll(metricsMap.values());
-        timelineMetrics.setMetrics(timeLineMetricList);
-        amsClient.emitMetrics(timelineMetrics);
-        logger.info("Published " + timeLineMetricList.size()
-          + " metrics to AMS");
-        metricsMap.clear();
-        timeLineMetricList.clear();
-        lastPublishTimeMS = currMS;
-      } catch (Throwable t) {
-        logger.warn("Error sending metrics to AMS.", t);
-        if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
-          logger.error("AMS was not sent for last "
-            + maxMetricsBuffer
-            / 1000
-            + " seconds. Purging it and will start rebuilding it again");
-          metricsMap.clear();
-          lastFailedPublishTimeMS = currMS;
-        }
-      }
-    } else {
-      logger.info("Not publishing metrics. metrics.size()="
-        + metricsMap.size() + ", lastPublished="
-        + (currMS - lastPublishTimeMS) / 1000
-        + " seconds ago, intervalConfigured=" + publishIntervalMS
-        / 1000);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
deleted file mode 100644
index 2a54f28..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import com.google.common.primitives.Ints;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-/**
- * This is a very fast, non-cryptographic hash suitable for general hash-based
- * lookup.  See http://murmurhash.googlepages.com/ for more details.
- * <p/>
- * <p>The C version of MurmurHash 2.0 found at that site was ported
- * to Java by Andrzej Bialecki (ab at getopt org).</p>
- */
-public final class MurmurHash {
-
-  private MurmurHash() {
-  }
-
-  /**
-   * Hashes an int.
-   *
-   * @param data The int to hash.
-   * @param seed The seed for the hash.
-   * @return The 32 bit hash of the bytes in question.
-   */
-  public static int hash(int data, int seed) {
-    return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
-  }
-
-  /**
-   * Hashes bytes in an array.
-   *
-   * @param data The bytes to hash.
-   * @param seed The seed for the hash.
-   * @return The 32 bit hash of the bytes in question.
-   */
-  public static int hash(byte[] data, int seed) {
-    return hash(ByteBuffer.wrap(data), seed);
-  }
-
-  /**
-   * Hashes bytes in part of an array.
-   *
-   * @param data   The data to hash.
-   * @param offset Where to start munging.
-   * @param length How many bytes to process.
-   * @param seed   The seed to start with.
-   * @return The 32-bit hash of the data in question.
-   */
-  public static int hash(byte[] data, int offset, int length, int seed) {
-    return hash(ByteBuffer.wrap(data, offset, length), seed);
-  }
-
-  /**
-   * Hashes the bytes in a buffer from the current position to the limit.
-   *
-   * @param buf  The bytes to hash.
-   * @param seed The seed for the hash.
-   * @return The 32 bit murmur hash of the bytes in the buffer.
-   */
-  public static int hash(ByteBuffer buf, int seed) {
-    // save byte order for later restoration
-    ByteOrder byteOrder = buf.order();
-    buf.order(ByteOrder.LITTLE_ENDIAN);
-
-    int m = 0x5bd1e995;
-    int r = 24;
-
-    int h = seed ^ buf.remaining();
-
-    while (buf.remaining() >= 4) {
-      int k = buf.getInt();
-
-      k *= m;
-      k ^= k >>> r;
-      k *= m;
-
-      h *= m;
-      h ^= k;
-    }
-
-    if (buf.remaining() > 0) {
-      ByteBuffer finish = 
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
-      // for big-endian version, use this first:
-      // finish.position(4-buf.remaining());
-      finish.put(buf).rewind();
-      h ^= finish.getInt();
-      h *= m;
-    }
-
-    h ^= h >>> 13;
-    h *= m;
-    h ^= h >>> 15;
-
-    buf.order(byteOrder);
-    return h;
-  }
-
-
-  public static long hash64A(byte[] data, int seed) {
-    return hash64A(ByteBuffer.wrap(data), seed);
-  }
-
-  public static long hash64A(byte[] data, int offset, int length, int seed) {
-    return hash64A(ByteBuffer.wrap(data, offset, length), seed);
-  }
-
-  public static long hash64A(ByteBuffer buf, int seed) {
-    ByteOrder byteOrder = buf.order();
-    buf.order(ByteOrder.LITTLE_ENDIAN);
-
-    long m = 0xc6a4a7935bd1e995L;
-    int r = 47;
-
-    long h = seed ^ (buf.remaining() * m);
-
-    while (buf.remaining() >= 8) {
-      long k = buf.getLong();
-
-      k *= m;
-      k ^= k >>> r;
-      k *= m;
-
-      h ^= k;
-      h *= m;
-    }
-
-    if (buf.remaining() > 0) {
-      ByteBuffer finish = 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
-      // for big-endian version, do this first:
-      // finish.position(8-buf.remaining());
-      finish.put(buf).rewind();
-      h ^= finish.getLong();
-      h *= m;
-    }
-
-    h ^= h >>> r;
-    h *= m;
-    h ^= h >>> r;
-
-    buf.order(byteOrder);
-    return h;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
deleted file mode 100644
index 41b005b..0000000
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-public class OutputMgr {
-  private static final Logger logger = Logger.getLogger(OutputMgr.class);
-
-  private Collection<Output> outputList = new ArrayList<Output>();
-
-  private boolean addMessageMD5 = true;
-
-  private int MAX_OUTPUT_SIZE = 32765; // 32766-1
-  private static long doc_counter = 0;
-  private MetricCount messageTruncateMetric = new MetricCount();
-
-  
-  public Collection<Output> getOutputList() {
-    return outputList;
-  }
-
-  public void setOutputList(Collection<Output> outputList) {
-    this.outputList = outputList;
-  }
-
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    Input input = inputMarker.input;
-
-    // Update the block with the context fields
-    for (Map.Entry<String, String> entry : input.getContextFields()
-      .entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null) {
-        jsonObj.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // TODO: Ideally most of the overrides should be configurable
-
-    // Add the input type
-    if (jsonObj.get("type") == null) {
-      jsonObj.put("type", input.getStringValue("type"));
-    }
-    if (jsonObj.get("path") == null && input.getFilePath() != null) {
-      jsonObj.put("path", input.getFilePath());
-    }
-    if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
-      jsonObj.put("path", input.getStringValue("path"));
-    }
-
-    // Add host if required
-    if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
-      jsonObj.put("host", LogFeederUtil.hostName);
-    }
-    // Add IP if required
-    if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
-      jsonObj.put("ip", LogFeederUtil.ipAddress);
-    }
-    
-    //Add level
-    if (jsonObj.get("level") == null) {
-      jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-    }
-    if (input.isUseEventMD5() || input.isGenEventMD5()) {
-      String prefix = "";
-      Object logtimeObj = jsonObj.get("logtime");
-      if (logtimeObj != null) {
-        if (logtimeObj instanceof Date) {
-          prefix = "" + ((Date) logtimeObj).getTime();
-        } else {
-          prefix = logtimeObj.toString();
-        }
-      }
-      Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
-        .toJson(jsonObj));
-      if (input.isGenEventMD5()) {
-        jsonObj.put("event_md5", prefix + eventMD5.toString());
-      }
-      if (input.isUseEventMD5()) {
-        jsonObj.put("id", prefix + eventMD5.toString());
-      }
-    }
-
-    // jsonObj.put("@timestamp", new Date());
-    jsonObj.put("seq_num", new Long(doc_counter++));
-    if (jsonObj.get("id") == null) {
-      jsonObj.put("id", UUID.randomUUID().toString());
-    }
-    if (jsonObj.get("event_count") == null) {
-      jsonObj.put("event_count", new Integer(1));
-    }
-    if (inputMarker.lineNumber > 0) {
-      jsonObj.put("logfile_line_number", new Integer(
-        inputMarker.lineNumber));
-    }
-    if (jsonObj.containsKey("log_message")) {
-      // TODO: Let's check size only for log_message for now
-      String logMessage = (String) jsonObj.get("log_message");
-      if (logMessage != null
-        && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
-        messageTruncateMetric.count++;
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-          + "_MESSAGESIZE";
-        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-          "Message is too big. size="
-            + logMessage.getBytes().length + ", input="
-            + input.getShortDescription()
-            + ". Truncating to " + MAX_OUTPUT_SIZE
-            + ", first upto 100 characters="
-            + LogFeederUtil.subString(logMessage, 100),
-          null, logger, Level.WARN);
-        logMessage = new String(logMessage.getBytes(), 0,
-          MAX_OUTPUT_SIZE);
-        jsonObj.put("log_message", logMessage);
-        // Add error tags
-        @SuppressWarnings("unchecked")
-        List<String> tagsList = (List<String>) jsonObj.get("tags");
-        if (tagsList == null) {
-          tagsList = new ArrayList<String>();
-          jsonObj.put("tags", tagsList);
-        }
-        tagsList.add("error_message_truncated");
-
-      }
-      if (addMessageMD5) {
-        jsonObj.put("message_md5",
-          "" + LogFeederUtil.genHash(logMessage));
-      }
-    }
-    //check log is allowed to send output
-    if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
-      for (Output output : input.getOutputList()) {
-        try {
-          output.write(jsonObj, inputMarker);
-        } catch (Exception e) {
-          logger.error("Error writing. to " + output.getShortDescription(), e);
-        }
-      }
-    }
-  }
-
-  public void write(String jsonBlock, InputMarker inputMarker) {
-    //check log is allowed to send output
-    if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
-      for (Output output : inputMarker.input.getOutputList()) {
-        try {
-          output.write(jsonBlock, inputMarker);
-        } catch (Exception e) {
-          logger.error("Error writing. to " + output.getShortDescription(), e);
-        }
-      }
-    }
-  }
-
-  public void close() {
-    logger.info("Close called for outputs ...");
-    for (Output output : outputList) {
-      try {
-        output.setDrain(true);
-        output.close();
-      } catch (Exception e) {
-        // Ignore
-      }
-    }
-    // Need to get this value from property
-    int iterations = 30;
-    int waitTimeMS = 1000;
-    int i;
-    boolean allClosed = true;
-    for (i = 0; i < iterations; i++) {
-      allClosed = true;
-      for (Output output : outputList) {
-        if (!output.isClosed()) {
-          try {
-            allClosed = false;
-            logger.warn("Waiting for output to close. "
-              + output.getShortDescription() + ", "
-              + (iterations - i) + " more seconds");
-            Thread.sleep(waitTimeMS);
-          } catch (Throwable t) {
-            // Ignore
-          }
-        }
-      }
-      if (allClosed) {
-        break;
-      }
-    }
-
-    if (!allClosed) {
-      logger.warn("Some outpus were not closed. Iterations=" + i);
-      for (Output output : outputList) {
-        if (!output.isClosed()) {
-          logger.warn("Output not closed. Will ignore it."
-            + output.getShortDescription() + ", pendingCound="
-            + output.getPendingCount());
-        }
-      }
-    } else {
-      logger.info("All outputs are closed. Iterations=" + i);
-    }
-  }
-
-  public void logStats() {
-    for (Output output : outputList) {
-      output.logStat();
-    }
-    LogFeederUtil.logStatForMetric(messageTruncateMetric,
-      "Stat: Messages Truncated", null);
-  }
-
-  public void addMetricsContainers(List<MetricCount> metricsList) {
-    metricsList.add(messageTruncateMetric);
-    for (Output output : outputList) {
-      output.addMetricsContainers(metricsList);
-    }
-  }
-
-  
-  public void copyFile(File inputFile, InputMarker inputMarker) {
-    Input input = inputMarker.input;
-    for (Output output : input.getOutputList()) {
-      try {
-        output.copyFile(inputFile, inputMarker);
-      }catch (Exception e) {
-        logger.error("Error coyping file . to " + output.getShortDescription(),
-            e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
new file mode 100644
index 0000000..287982f
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+
+
+public abstract class ConfigBlock {
+  static private Logger logger = Logger.getLogger(ConfigBlock.class);
+
+  private boolean drain = false;
+
+  protected Map<String, Object> configs;
+  protected Map<String, String> contextFields = new HashMap<String, String>();
+  public MetricCount statMetric = new MetricCount();
+
+  /**
+   *
+   */
+  public ConfigBlock() {
+    super();
+  }
+
+  /**
+   * Used while logging. Keep it short and meaningful
+   */
+  public abstract String getShortDescription();
+
+  /**
+   * Every implementor need to give name to the thread they create
+   */
+  public String getNameForThread() {
+    return this.getClass().getSimpleName();
+  }
+
+  /**
+   * @param metricsList
+   */
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    metricsList.add(statMetric);
+  }
+
+  /**
+   * This method needs to be overwritten by deriving classes.
+   */
+  public void init() throws Exception {
+  }
+
+  public void loadConfig(Map<String, Object> map) {
+    configs = LogFeederUtil.cloneObject(map);
+
+    Map<String, String> nvList = getNVList("add_fields");
+    if (nvList != null) {
+      contextFields.putAll(nvList);
+    }
+  }
+
+  public Map<String, Object> getConfigs() {
+    return configs;
+  }
+
+  @SuppressWarnings("unchecked")
+  public boolean isEnabled() {
+    boolean isEnabled = getBooleanValue("is_enabled", true);
+    if (isEnabled) {
+      // Let's check for static conditions
+      Map<String, Object> conditions = (Map<String, Object>) configs
+        .get("conditions");
+      boolean allow = true;
+      if (conditions != null && conditions.size() > 0) {
+        allow = false;
+        for (String conditionType : conditions.keySet()) {
+          if (conditionType.equalsIgnoreCase("fields")) {
+            Map<String, Object> fields = (Map<String, Object>) conditions
+              .get("fields");
+            for (String fieldName : fields.keySet()) {
+              Object values = fields.get(fieldName);
+              if (values instanceof String) {
+                allow = isFieldConditionMatch(fieldName,
+                  (String) values);
+              } else {
+                List<String> listValues = (List<String>) values;
+                for (String stringValue : listValues) {
+                  allow = isFieldConditionMatch(fieldName,
+                    stringValue);
+                  if (allow) {
+                    break;
+                  }
+                }
+              }
+              if (allow) {
+                break;
+              }
+            }
+          }
+          if (allow) {
+            break;
+          }
+        }
+        isEnabled = allow;
+      }
+    }
+    return isEnabled;
+  }
+
+  public boolean isFieldConditionMatch(String fieldName, String stringValue) {
+    boolean allow = false;
+    String fieldValue = (String) configs.get(fieldName);
+    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
+      allow = true;
+    } else {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> addFields = (Map<String, Object>) configs
+        .get("add_fields");
+      if (addFields != null && addFields.get(fieldName) != null) {
+        String addFieldValue = (String) addFields.get(fieldName);
+        if (stringValue.equalsIgnoreCase(addFieldValue)) {
+          allow = true;
+        }
+      }
+
+    }
+    return allow;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<String, String> getNVList(String key) {
+    return (Map<String, String>) configs.get(key);
+  }
+
+  public String getStringValue(String key) {
+    Object value = configs.get(key);
+    if (value != null && value.toString().equalsIgnoreCase("none")) {
+      value = null;
+    }
+    if (value != null) {
+      return value.toString();
+    }
+    return null;
+  }
+
+  public String getStringValue(String key, String defaultValue) {
+    Object value = configs.get(key);
+    if (value != null && value.toString().equalsIgnoreCase("none")) {
+      value = null;
+    }
+
+    if (value != null) {
+      return value.toString();
+    }
+    return defaultValue;
+  }
+
+  public Object getConfigValue(String key) {
+    return configs.get(key);
+  }
+
+  public boolean getBooleanValue(String key, boolean defaultValue) {
+    String strValue = getStringValue(key);
+    boolean retValue = defaultValue;
+    if (!StringUtils.isEmpty(strValue)) {
+      if (strValue.equalsIgnoreCase("true")
+        || strValue.equalsIgnoreCase("yes")) {
+        retValue = true;
+      } else {
+        retValue = false;
+      }
+    }
+    return retValue;
+  }
+
+  public int getIntValue(String key, int defaultValue) {
+    String strValue = getStringValue(key);
+    int retValue = defaultValue;
+    if (!StringUtils.isEmpty(strValue)) {
+      try {
+        retValue = Integer.parseInt(strValue);
+      } catch (Throwable t) {
+        logger.error("Error parsing integer value. key=" + key
+          + ", value=" + strValue);
+      }
+    }
+    return retValue;
+  }
+  
+  public long getLongValue(String key, long defaultValue) {
+    String strValue = getStringValue(key);
+    Long retValue = defaultValue;
+    if (!StringUtils.isEmpty(strValue)) {
+      try {
+        retValue = Long.parseLong(strValue);
+      } catch (Throwable t) {
+        logger.error("Error parsing long value. key=" + key + ", value="
+            + strValue);
+      }
+    }
+    return retValue;
+  }
+
+  public Map<String, String> getContextFields() {
+    return contextFields;
+  }
+
+  public void incrementStat(int count) {
+    statMetric.count += count;
+  }
+
+  public void logStatForMetric(MetricCount metric, String prefixStr) {
+    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key="
+      + getShortDescription());
+  }
+
+  synchronized public void logStat() {
+    logStatForMetric(statMetric, "Stat");
+  }
+
+  public boolean logConfgs(Priority level) {
+    if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) {
+      return false;
+    }
+    if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) {
+      return false;
+    }
+    logger.log(level, "Printing configuration Block="
+      + getShortDescription());
+    logger.log(level, "configs=" + configs);
+    logger.log(level, "contextFields=" + contextFields);
+    return true;
+  }
+
+  public boolean isDrain() {
+    return drain;
+  }
+
+  public void setDrain(boolean drain) {
+    this.drain = drain;
+  }
+}

Reply via email to