AMBARI-21623 Log Search Config should be separated into a Server and Log Feeder 
interface (mgergely)

Change-Id: Ie40cf3b57470c08375124d547dade2c9a3204e9f


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2d1ac668
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2d1ac668
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2d1ac668

Branch: refs/heads/branch-feature-AMBARI-14714
Commit: 2d1ac668b6ae272481668d08f4b7bd1c1b99fc45
Parents: 27386c3
Author: Miklos Gergely <mgerg...@hortonworks.com>
Authored: Fri Aug 4 11:27:57 2017 +0200
Committer: Miklos Gergely <mgerg...@hortonworks.com>
Committed: Fri Aug 4 11:27:57 2017 +0200

----------------------------------------------------------------------
 .../logsearch/config/api/LogSearchConfig.java   | 131 ---------
 .../config/api/LogSearchConfigFactory.java      |  61 +++-
 .../config/api/LogSearchConfigLogFeeder.java    |  77 +++++
 .../config/api/LogSearchConfigServer.java       | 111 +++++++
 .../config/api/LogSearchConfigClass1.java       |  95 ------
 .../config/api/LogSearchConfigClass2.java       |  95 ------
 .../config/api/LogSearchConfigFactoryTest.java  |  51 +++-
 .../api/LogSearchConfigLogFeederClass1.java     |  58 ++++
 .../api/LogSearchConfigLogFeederClass2.java     |  58 ++++
 .../config/api/LogSearchConfigServerClass1.java |  76 +++++
 .../config/api/LogSearchConfigServerClass2.java |  76 +++++
 .../zookeeper/LogSearchConfigLogFeederZK.java   | 228 +++++++++++++++
 .../zookeeper/LogSearchConfigServerZK.java      | 138 +++++++++
 .../config/zookeeper/LogSearchConfigZK.java     | 291 +------------------
 .../org/apache/ambari/logfeeder/LogFeeder.java  |  11 +-
 .../ambari/logfeeder/common/ConfigHandler.java  |   6 +-
 .../logfeeder/input/InputConfigUploader.java    |  10 +-
 .../apache/ambari/logfeeder/output/Output.java  |   6 +-
 .../ambari/logfeeder/output/OutputSolrTest.java |   6 +-
 .../configurer/LogSearchConfigConfigurer.java   |  13 +-
 .../configurer/SolrCollectionConfigurer.java    |   3 +-
 .../ambari/logsearch/dao/SolrDaoBase.java       |  11 +-
 .../handler/CreateCollectionHandler.java        |   7 +-
 .../logsearch/manager/ShipperConfigManager.java |  18 +-
 .../logsearch/model/common/LSServerFilter.java  |  10 +-
 25 files changed, 970 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index 76be392..6c3b910 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -20,80 +20,14 @@
 package org.apache.ambari.logsearch.config.api;
 
 import java.io.Closeable;
-import java.util.List;
-import java.util.Map;
 
 import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
-import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 /**
  * Log Search Configuration, which uploads, retrieves configurations, and 
monitors it's changes.
  */
 public interface LogSearchConfig extends Closeable {
   /**
-   * Enumeration of the components of the Log Search service.
-   */
-  public enum Component {
-    SERVER, LOGFEEDER;
-  }
-
-  /**
-   * Initialization of the configuration.
-   * 
-   * @param component The component which will use the configuration.
-   * @param properties The properties of that component.
-   * @param clusterName The name of the cluster, only need to be specified in 
LOGFEEDER mode (null for SERVER mode).
-   * @throws Exception
-   */
-  void init(Component component, Map<String, String> properties, String 
clusterName) throws Exception;
-
-  /**
-   * Returns all the service names with input configurations of a cluster. 
Will be used only in SERVER mode.
-   * 
-   * @param clusterName The name of the cluster which's services are required.
-   * @return List of the service names.
-   */
-  List<String> getServices(String clusterName);
-
-  /**
-   * Checks if input configuration exists. Will be used only in LOGFEEDER mode.
-   * 
-   * @param serviceName The name of the service looked for.
-   * @return If input configuration exists for the service.
-   * @throws Exception
-   */
-  boolean inputConfigExistsLogFeeder(String serviceName) throws Exception;
-
-  /**
-   * Checks if input configuration exists. Will be used only in SERVER mode.
-   * 
-   * @param clusterName The name of the cluster where the service is looked 
for.
-   * @param serviceName The name of the service looked for.
-   * @return If input configuration exists for the service.
-   * @throws Exception
-   */
-  boolean inputConfigExistsServer(String clusterName, String serviceName) 
throws Exception;
-
-  /**
-   * Returns the global configurations of a cluster. Will be used only in 
SERVER mode.
-   * 
-   * @param clusterName The name of the cluster where the service is looked 
for.
-   * @return The global configurations of the cluster if it exists, null 
otherwise.
-   */
-  String getGlobalConfigs(String clusterName);
-
-  /**
-   * Returns the input configuration of a service in a cluster. Will be used 
only in SERVER mode.
-   * 
-   * @param clusterName The name of the cluster where the service is looked 
for.
-   * @param serviceName The name of the service looked for.
-   * @return The input configuration for the service if it exists, null 
otherwise.
-   */
-  InputConfig getInputConfig(String clusterName, String serviceName);
-
-  /**
    * Uploads the input configuration for a service in a cluster.
    * 
    * @param clusterName The name of the cluster where the service is.
@@ -104,16 +38,6 @@ public interface LogSearchConfig extends Closeable {
   void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception;
 
   /**
-   * Modifies the input configuration for a service in a cluster.
-   * 
-   * @param clusterName The name of the cluster where the service is.
-   * @param serviceName The name of the service of which's input configuration 
is uploaded.
-   * @param inputConfig The input configuration of the service.
-   * @throws Exception
-   */
-  void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception;
-
-  /**
    * Uploads the log level filter of a log.
    * 
    * @param clusterName The name of the cluster where the log is.
@@ -122,59 +46,4 @@ public interface LogSearchConfig extends Closeable {
    * @throws Exception 
    */
   void createLogLevelFilter(String clusterName, String logId, LogLevelFilter 
filter) throws Exception;
-
-  /**
-   * Modifies the log level filters for all the logs.
-   * 
-   * @param clusterName The name of the cluster where the logs are.
-   * @param filters The log level filters to set.
-   * @throws Exception
-   */
-  void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) 
throws Exception;
-
-  /**
-   * Returns the Log Level Filters of a cluster.
-   * 
-   * @param clusterName The name of the cluster which's log level filters are 
required.
-   * @return All the log level filters of the cluster.
-   */
-  LogLevelFilterMap getLogLevelFilters(String clusterName);
-
-  /**
-   * Starts the monitoring of the input configurations, asynchronously. Will 
be used only in LOGFEEDER mode.
-   * 
-   * @param inputConfigMonitor The input config monitor to call in case of an 
input config change.
-   * @param logLevelFilterMonitor The log level filter monitor to call in case 
of a log level filter change.
-   * @param clusterName The name of the cluster, only need to be specified in 
LOGFEEDER mode (null for SERVER mode).
-   * @throws Exception
-   */
-  void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor,
-      String clusterName) throws Exception;
-
-  /**
-   * Saves the properties of an Output Solr. Will be used only in SERVER mode.
-   * 
-   * @param type The type of the Output Solr.
-   * @param outputSolrProperties The properties of the Output Solr.
-   * @throws Exception
-   */
-  void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception;
-
-  /**
-   * Get the properties of an Output Solr. Will be used only in LOGFEEDER mode.
-   * 
-   * @param type The type of the Output Solr.
-   * @return The properties of the Output Solr, or null if it doesn't exist.
-   * @throws Exception
-   */
-  OutputSolrProperties getOutputSolrProperties(String type) throws Exception;
-
-  /**
-   * Saves the properties of an Output Solr. Will be used only in LOGFEEDER 
mode.
-   * 
-   * @param type The type of the Output Solr.
-   * @param outputConfigMonitors The monitors which want to watch the output 
config changes.
-   * @throws Exception
-   */
-  void monitorOutputProperties(List<? extends OutputConfigMonitor> 
outputConfigMonitors) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
index 77b48eb..a84a97b 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
@@ -21,46 +21,81 @@ package org.apache.ambari.logsearch.config.api;
 
 import java.util.Map;
 
-import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Factory class for LogSearchConfig.
+ * Factory class for LogSearchConfigServer and LogSearchConfigLogFeeder.
  */
 public class LogSearchConfigFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(LogSearchConfigFactory.class);
 
   /**
-   * Creates a Log Search Configuration instance that implements {@link 
org.apache.ambari.logsearch.config.api.LogSearchConfig}.
+   * Creates a Log Search Configuration instance for the Log Search Server 
that implements
+   * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigServer}.
    * 
-   * @param component The component of the Log Search Service to create the 
configuration for (SERVER/LOGFEEDER).
    * @param properties The properties of the component for which the 
configuration is created. If the properties contain the
    *                  "logsearch.config.class" entry than the class defined 
there would be used instead of the default class.
-   * @param clusterName The name of the cluster, only need to be specified in 
LOGFEEDER mode (null for SERVER mode).
    * @param defaultClass The default configuration class to use if not 
specified otherwise.
    * @return The Log Search Configuration instance.
-   * @throws Exception Throws exception if the defined class does not 
implement LogSearchConfig, or doesn't have an empty
+   * @throws Exception Throws exception if the defined class does not 
implement LogSearchConfigServer, or doesn't have an empty
    *                   constructor, or throws an exception in it's init method.
    */
-  public static LogSearchConfig createLogSearchConfig(Component component, 
Map<String, String> properties, String clusterName,
-      Class<? extends LogSearchConfig> defaultClass) throws Exception {
+  public static LogSearchConfigServer createLogSearchConfigServer(Map<String, 
String> properties,
+      Class<? extends LogSearchConfigServer> defaultClass) throws Exception {
     try {
-      LogSearchConfig logSearchConfig = null;
-      String configClassName = properties.get("logsearch.config.class");
+      LogSearchConfigServer logSearchConfig = null;
+      String configClassName = properties.get("logsearch.config.server.class");
+      if (configClassName != null && !"".equals(configClassName.trim())) {
+        Class<?> clazz = Class.forName(configClassName);
+        if (LogSearchConfigServer.class.isAssignableFrom(clazz)) {
+          logSearchConfig = (LogSearchConfigServer) clazz.newInstance();
+        } else {
+          throw new IllegalArgumentException("Class " + configClassName + " 
does not implement the interface " +
+              LogSearchConfigServer.class.getName());
+        }
+      } else {
+        logSearchConfig = defaultClass.newInstance();
+      }
+      
+      logSearchConfig.init(properties);
+      return logSearchConfig;
+    } catch (Exception e) {
+      LOG.error("Could not initialize logsearch config.", e);
+      throw e;
+    }
+  }
+
+  /**
+   * Creates a Log Search Configuration instance for the Log Search Server 
that implements
+   * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder}.
+   * 
+   * @param properties The properties of the component for which the 
configuration is created. If the properties contain the
+   *                  "logsearch.config.class" entry than the class defined 
there would be used instead of the default class.
+   * @param clusterName The name of the cluster.
+   * @param defaultClass The default configuration class to use if not 
specified otherwise.
+   * @return The Log Search Configuration instance.
+   * @throws Exception Throws exception if the defined class does not 
implement LogSearchConfigLogFeeder, or doesn't have an empty
+   *                   constructor, or throws an exception in it's init method.
+   */
+  public static LogSearchConfigLogFeeder 
createLogSearchConfigLogFeeder(Map<String, String> properties, String 
clusterName,
+      Class<? extends LogSearchConfigLogFeeder> defaultClass) throws Exception 
{
+    try {
+      LogSearchConfigLogFeeder logSearchConfig = null;
+      String configClassName = 
properties.get("logsearch.config.logfeeder.class");
       if (configClassName != null && !"".equals(configClassName.trim())) {
         Class<?> clazz = Class.forName(configClassName);
         if (LogSearchConfig.class.isAssignableFrom(clazz)) {
-          logSearchConfig = (LogSearchConfig) clazz.newInstance();
+          logSearchConfig = (LogSearchConfigLogFeeder) clazz.newInstance();
         } else {
           throw new IllegalArgumentException("Class " + configClassName + " 
does not implement the interface " +
-              LogSearchConfig.class.getName());
+              LogSearchConfigLogFeeder.class.getName());
         }
       } else {
         logSearchConfig = defaultClass.newInstance();
       }
       
-      logSearchConfig.init(component, properties, clusterName);
+      logSearchConfig.init(properties, clusterName);
       return logSearchConfig;
     } catch (Exception e) {
       LOG.error("Could not initialize logsearch config.", e);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
new file mode 100644
index 0000000..6ed36fd
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+
+/**
+ * Log Search Configuration for Log Feeder.
+ */
+public interface LogSearchConfigLogFeeder extends LogSearchConfig {
+  /**
+   * Initialization of the configuration.
+   * 
+   * @param properties The properties of that component.
+   * @param clusterName The name of the cluster.
+   * @throws Exception
+   */
+  void init(Map<String, String> properties, String clusterName) throws 
Exception;
+
+  /**
+   * Checks if input configuration exists.
+   * 
+   * @param serviceName The name of the service looked for.
+   * @return If input configuration exists for the service.
+   * @throws Exception
+   */
+  boolean inputConfigExists(String serviceName) throws Exception;
+
+  /**
+   * Starts the monitoring of the input configurations, asynchronously.
+   * 
+   * @param inputConfigMonitor The input config monitor to call in case of an 
input config change.
+   * @param logLevelFilterMonitor The log level filter monitor to call in case 
of a log level filter change.
+   * @param clusterName The name of the cluster.
+   * @throws Exception
+   */
+  void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor,
+      String clusterName) throws Exception;
+
+  /**
+   * Get the properties of an Output Solr.
+   * 
+   * @param type The type of the Output Solr.
+   * @return The properties of the Output Solr, or null if it doesn't exist.
+   * @throws Exception
+   */
+  OutputSolrProperties getOutputSolrProperties(String type) throws Exception;
+
+  /**
+   * Saves the properties of an Output Solr.
+   * 
+   * @param type The type of the Output Solr.
+   * @param outputConfigMonitors The monitors which want to watch the output 
config changes.
+   * @throws Exception
+   */
+  void monitorOutputProperties(List<? extends OutputConfigMonitor> 
outputConfigMonitors) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java
new file mode 100644
index 0000000..26889be
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+
+/**
+ * Log Search Configuration for Log Search Server.
+ */
+public interface LogSearchConfigServer extends LogSearchConfig {
+  /**
+   * Initialization of the configuration.
+   * 
+   * @param properties The properties of that component.
+   * @throws Exception
+   */
+  void init(Map<String, String> properties) throws Exception;
+
+  /**
+   * Returns all the service names with input configurations of a cluster.
+   * 
+   * @param clusterName The name of the cluster which's services are required.
+   * @return List of the service names.
+   */
+  List<String> getServices(String clusterName);
+
+  /**
+   * Checks if input configuration exists.
+   * 
+   * @param clusterName The name of the cluster where the service is looked 
for.
+   * @param serviceName The name of the service looked for.
+   * @return If input configuration exists for the service.
+   * @throws Exception
+   */
+  boolean inputConfigExists(String clusterName, String serviceName) throws 
Exception;
+
+  /**
+   * Returns the global configurations of a cluster.
+   * 
+   * @param clusterName The name of the cluster where the service is looked 
for.
+   * @return The global configurations of the cluster if it exists, null 
otherwise.
+   */
+  String getGlobalConfigs(String clusterName);
+
+  /**
+   * Modifies the input configuration for a service in a cluster.
+   * 
+   * @param clusterName The name of the cluster where the service is.
+   * @param serviceName The name of the service of which's input configuration 
is uploaded.
+   * @param inputConfig The input configuration of the service.
+   * @throws Exception
+   */
+  void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception;
+
+  /**
+   * Returns the input configuration of a service in a cluster.
+   * 
+   * @param clusterName The name of the cluster where the service is looked 
for.
+   * @param serviceName The name of the service looked for.
+   * @return The input configuration for the service if it exists, null 
otherwise.
+   */
+  InputConfig getInputConfig(String clusterName, String serviceName);
+
+  /**
+   * Modifies the log level filters for all the logs.
+   * 
+   * @param clusterName The name of the cluster where the logs are.
+   * @param filters The log level filters to set.
+   * @throws Exception
+   */
+  void setLogLevelFilters(String clusterName, LogLevelFilterMap filters) 
throws Exception;
+
+  /**
+   * Returns the Log Level Filters of a cluster.
+   * 
+   * @param clusterName The name of the cluster which's log level filters are 
required.
+   * @return All the log level filters of the cluster.
+   */
+  LogLevelFilterMap getLogLevelFilters(String clusterName);
+
+  /**
+   * Saves the properties of an Output Solr.
+   * 
+   * @param type The type of the Output Solr.
+   * @param outputSolrProperties The properties of the Output Solr.
+   * @throws Exception
+   */
+  void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
deleted file mode 100644
index e308346..0000000
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logsearch.config.api;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
-import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
-
-public class LogSearchConfigClass1 implements LogSearchConfig {
-  @Override
-  public void init(Component component, Map<String, String> properties, String 
clusterName) {}
-
-  @Override
-  public boolean inputConfigExistsLogFeeder(String serviceName) throws 
Exception {
-    return false;
-  }
-
-  @Override
-  public boolean inputConfigExistsServer(String clusterName, String 
serviceName) throws Exception {
-    return false;
-  }
-
-  @Override
-  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
-
-  @Override
-  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
-
-  @Override
-  public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor,
-      String clusterName) throws Exception {}
-
-  @Override
-  public List<String> getServices(String clusterName) {
-    return null;
-  }
-
-  @Override
-  public String getGlobalConfigs(String clusterName) {
-    return null;
-  }
-
-  @Override
-  public InputConfig getInputConfig(String clusterName, String serviceName) {
-    return null;
-  }
-
-  @Override
-  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
-
-  @Override
-  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {}
-
-  @Override
-  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
-    return null;
-  }
-
-  @Override
-  public void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception {}
-
-  @Override
-  public OutputSolrProperties getOutputSolrProperties(String type) {
-    return null;
-  }
-
-  @Override
-  public void monitorOutputProperties(List<? extends OutputConfigMonitor> 
outputConfigMonitors) throws Exception {}
-
-  @Override
-  public void close() {}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
deleted file mode 100644
index b64dae8..0000000
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logsearch.config.api;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
-import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
-
-public class LogSearchConfigClass2 implements LogSearchConfig {
-  @Override
-  public void init(Component component, Map<String, String> properties, String 
clusterName) {}
-
-  @Override
-  public boolean inputConfigExistsLogFeeder(String serviceName) throws 
Exception {
-    return false;
-  }
-
-  @Override
-  public boolean inputConfigExistsServer(String clusterName, String 
serviceName) throws Exception {
-    return false;
-  }
-
-  @Override
-  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
-
-  @Override
-  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
-
-  @Override
-  public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor,
-      String clusterName) throws Exception {}
-
-  @Override
-  public List<String> getServices(String clusterName) {
-    return null;
-  }
-
-  @Override
-  public String getGlobalConfigs(String clusterName) {
-    return null;
-  }
-
-  @Override
-  public InputConfig getInputConfig(String clusterName, String serviceName) {
-    return null;
-  }
-
-  @Override
-  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
-
-  @Override
-  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {}
-
-  @Override
-  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
-    return null;
-  }
-
-  @Override
-  public void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception {}
-
-  @Override
-  public OutputSolrProperties getOutputSolrProperties(String type) {
-    return null;
-  }
-
-  @Override
-  public void monitorOutputProperties(List<? extends OutputConfigMonitor> 
outputConfigMonitors) throws Exception {}
-
-  @Override
-  public void close() {}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
index f990c5c..d0db87f 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -31,28 +30,52 @@ import junit.framework.Assert;
 public class LogSearchConfigFactoryTest {
 
   @Test
-  public void testDefaultConfig() throws Exception {
-    LogSearchConfig config = 
LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
-        Collections.<String, String> emptyMap(), null, 
LogSearchConfigClass1.class);
+  public void testDefaultConfigServer() throws Exception {
+    LogSearchConfigServer config = 
LogSearchConfigFactory.createLogSearchConfigServer( Collections.<String, 
String> emptyMap(),
+        LogSearchConfigServerClass1.class);
     
-    Assert.assertSame(config.getClass(), LogSearchConfigClass1.class);
+    Assert.assertSame(config.getClass(), LogSearchConfigServerClass1.class);
   }
 
   @Test
-  public void testCustomConfig() throws Exception {
+  public void testCustomConfigServer() throws Exception {
     Map<String, String> logsearchConfClassMap = new HashMap<>();
-    logsearchConfClassMap.put("logsearch.config.class", 
"org.apache.ambari.logsearch.config.api.LogSearchConfigClass2");
-    LogSearchConfig config = 
LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
-      logsearchConfClassMap, null, LogSearchConfigClass1.class);
+    logsearchConfClassMap.put("logsearch.config.server.class", 
"org.apache.ambari.logsearch.config.api.LogSearchConfigServerClass2");
+    LogSearchConfig config = 
LogSearchConfigFactory.createLogSearchConfigServer(logsearchConfClassMap,
+        LogSearchConfigServerClass1.class);
     
-    Assert.assertSame(config.getClass(), LogSearchConfigClass2.class);
+    Assert.assertSame(config.getClass(), LogSearchConfigServerClass2.class);
   }
   
   @Test(expected = IllegalArgumentException.class)
-  public void testNonConfigClass() throws Exception {
+  public void testNonConfigClassServer() throws Exception {
     Map<String, String> logsearchConfClassMap = new HashMap<>();
-    logsearchConfClassMap.put("logsearch.config.class", 
"org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass");
-    LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
-      logsearchConfClassMap, null, LogSearchConfigClass1.class);
+    logsearchConfClassMap.put("logsearch.config.server.class", 
"org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass");
+    LogSearchConfigFactory.createLogSearchConfigServer(logsearchConfClassMap, 
LogSearchConfigServerClass1.class);
+  }
+
+  @Test
+  public void testDefaultConfigLogFeeder() throws Exception {
+    LogSearchConfigLogFeeder config = 
LogSearchConfigFactory.createLogSearchConfigLogFeeder( Collections.<String, 
String> emptyMap(),
+        null, LogSearchConfigLogFeederClass1.class);
+    
+    Assert.assertSame(config.getClass(), LogSearchConfigLogFeederClass1.class);
+  }
+
+  @Test
+  public void testCustomConfigLogFeeder() throws Exception {
+    Map<String, String> logsearchConfClassMap = new HashMap<>();
+    logsearchConfClassMap.put("logsearch.config.logfeeder.class", 
"org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeederClass2");
+    LogSearchConfigLogFeeder config = 
LogSearchConfigFactory.createLogSearchConfigLogFeeder(logsearchConfClassMap, 
null,
+        LogSearchConfigLogFeederClass1.class);
+    
+    Assert.assertSame(config.getClass(), LogSearchConfigLogFeederClass2.class);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testNonConfigClassLogFeeder() throws Exception {
+    Map<String, String> logsearchConfClassMap = new HashMap<>();
+    logsearchConfClassMap.put("logsearch.config.logfeeder.class", 
"org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass");
+    
LogSearchConfigFactory.createLogSearchConfigLogFeeder(logsearchConfClassMap, 
null, LogSearchConfigLogFeederClass1.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java
new file mode 100644
index 0000000..b7da7fe
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass1.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+
+public class LogSearchConfigLogFeederClass1 implements 
LogSearchConfigLogFeeder {
+  @Override
+  public void init(Map<String, String> properties, String clusterName) {}
+
+  @Override
+  public boolean inputConfigExists(String serviceName) throws Exception {
+    return false;
+  }
+
+  @Override
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
+  public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor,
+      String clusterName) throws Exception {}
+
+  @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
+
+  @Override
+  public OutputSolrProperties getOutputSolrProperties(String type) {
+    return null;
+  }
+
+  @Override
+  public void monitorOutputProperties(List<? extends OutputConfigMonitor> 
outputConfigMonitors) throws Exception {}
+
+  @Override
+  public void close() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java
new file mode 100644
index 0000000..b703bf8
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeederClass2.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+
+public class LogSearchConfigLogFeederClass2 implements 
LogSearchConfigLogFeeder {
+  @Override
+  public void init(Map<String, String> properties, String clusterName) {}
+
+  @Override
+  public boolean inputConfigExists(String serviceName) throws Exception {
+    return false;
+  }
+
+  @Override
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
+  public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, 
LogLevelFilterMonitor logLevelFilterMonitor,
+      String clusterName) throws Exception {}
+
+  @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
+
+  @Override
+  public OutputSolrProperties getOutputSolrProperties(String type) {
+    return null;
+  }
+
+  @Override
+  public void monitorOutputProperties(List<? extends OutputConfigMonitor> 
outputConfigMonitors) throws Exception {}
+
+  @Override
+  public void close() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java
new file mode 100644
index 0000000..71e924a
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass1.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+
+public class LogSearchConfigServerClass1 implements LogSearchConfigServer {
+  @Override
+  public void init(Map<String, String> properties) {}
+
+  @Override
+  public boolean inputConfigExists(String clusterName, String serviceName) 
throws Exception {
+    return false;
+  }
+
+  @Override
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
+  public List<String> getServices(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public String getGlobalConfigs(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public InputConfig getInputConfig(String clusterName, String serviceName) {
+    return null;
+  }
+
+  @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
+
+  @Override
+  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {}
+
+  @Override
+  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception {}
+
+  @Override
+  public void close() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java
new file mode 100644
index 0000000..a767ff5
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigServerClass2.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+
+public class LogSearchConfigServerClass2 implements LogSearchConfigServer {
+  @Override
+  public void init(Map<String, String> properties) {}
+
+  @Override
+  public boolean inputConfigExists(String clusterName, String serviceName) 
throws Exception {
+    return false;
+  }
+
+  @Override
+  public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {}
+
+  @Override
+  public List<String> getServices(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public String getGlobalConfigs(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public InputConfig getInputConfig(String clusterName, String serviceName) {
+    return null;
+  }
+
+  @Override
+  public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) {}
+
+  @Override
+  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {}
+
+  @Override
+  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception {}
+
+  @Override
+  public void close() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
new file mode 100644
index 0000000..c050540
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigLogFeederZK.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
+import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import 
org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class LogSearchConfigLogFeederZK extends LogSearchConfigZK implements 
LogSearchConfigLogFeeder {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LogSearchConfigLogFeederZK.class);
+
+  private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
+
+  private TreeCache logFeederClusterCache;
+
+  @Override
+  public void init(Map<String, String> properties, String clusterName) throws 
Exception {
+    super.init(properties);
+    while (client.checkExists().forPath("/") == null) {
+      LOG.info("Root node is not present yet, going to sleep for " + 
WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
+      Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
+    }
+    
+    logFeederClusterCache = new TreeCache(client, String.format("/%s", 
clusterName));
+  }
+
+  @Override
+  public boolean inputConfigExists(String serviceName) throws Exception {
+    String nodePath = String.format("/input/%s", serviceName);
+    return logFeederClusterCache.getCurrentData(nodePath) != null;
+  }
+
+  @Override
+  public void monitorInputConfigChanges(final InputConfigMonitor 
inputConfigMonitor,
+      final LogLevelFilterMonitor logLevelFilterMonitor, final String 
clusterName) throws Exception {
+    final JsonParser parser = new JsonParser();
+    final JsonArray globalConfigNode = new JsonArray();
+    for (String globalConfigJsonString : 
inputConfigMonitor.getGlobalConfigJsons()) {
+      JsonElement globalConfigJson = parser.parse(globalConfigJsonString);
+      globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global"));
+    }
+    
+    createGlobalConfigNode(globalConfigNode, clusterName);
+    
+    TreeCacheListener listener = new TreeCacheListener() {
+      private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, 
Type.NODE_UPDATED, Type.NODE_REMOVED);
+      
+      public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
+        if (!nodeEvents.contains(event.getType())) {
+          return;
+        }
+        
+        String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
+        String nodeData = new String(event.getData().getData());
+        Type eventType = event.getType();
+        
+        String configPathStab = String.format("/%s/", clusterName);
+        
+        if (event.getData().getPath().startsWith(configPathStab + "input/")) {
+          handleInputConfigChange(eventType, nodeName, nodeData);
+        } else if (event.getData().getPath().startsWith(configPathStab + 
"loglevelfilter/")) {
+          handleLogLevelFilterChange(eventType, nodeName, nodeData);
+        }
+      }
+
+      private void handleInputConfigChange(Type eventType, String nodeName, 
String nodeData) {
+        switch (eventType) {
+          case NODE_ADDED:
+            LOG.info("Node added under input ZK node: " + nodeName);
+            addInputs(nodeName, nodeData);
+            break;
+          case NODE_UPDATED:
+            LOG.info("Node updated under input ZK node: " + nodeName);
+            removeInputs(nodeName);
+            addInputs(nodeName, nodeData);
+            break;
+          case NODE_REMOVED:
+            LOG.info("Node removed from input ZK node: " + nodeName);
+            removeInputs(nodeName);
+            break;
+          default:
+            break;
+        }
+      }
+
+      private void removeInputs(String serviceName) {
+        inputConfigMonitor.removeInputs(serviceName);
+      }
+
+      private void addInputs(String serviceName, String inputConfig) {
+        try {
+          JsonElement inputConfigJson = parser.parse(inputConfig);
+          for (Map.Entry<String, JsonElement> typeEntry : 
inputConfigJson.getAsJsonObject().entrySet()) {
+            for (JsonElement e : typeEntry.getValue().getAsJsonArray()) {
+              for (JsonElement globalConfig : globalConfigNode) {
+                merge(globalConfig.getAsJsonObject(), e.getAsJsonObject());
+              }
+            }
+          }
+          
+          inputConfigMonitor.loadInputConfigs(serviceName, 
InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+        } catch (Exception e) {
+          LOG.error("Could not load input configuration for service " + 
serviceName + ":\n" + inputConfig, e);
+        }
+      }
+
+      private void handleLogLevelFilterChange(Type eventType, String nodeName, 
String nodeData) {
+        switch (eventType) {
+          case NODE_ADDED:
+          case NODE_UPDATED:
+            LOG.info("Node added/updated under loglevelfilter ZK node: " + 
nodeName);
+            LogLevelFilter logLevelFilter = gson.fromJson(nodeData, 
LogLevelFilter.class);
+            logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
+            break;
+          case NODE_REMOVED:
+            LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
+            logLevelFilterMonitor.removeLogLevelFilter(nodeName);
+            break;
+          default:
+            break;
+        }
+      }
+
+      private void merge(JsonObject source, JsonObject target) {
+        for (Map.Entry<String, JsonElement> e : source.entrySet()) {
+          if (!target.has(e.getKey())) {
+            target.add(e.getKey(), e.getValue());
+          } else {
+            if (e.getValue().isJsonObject()) {
+              JsonObject valueJson = (JsonObject)e.getValue();
+              merge(valueJson, target.get(e.getKey()).getAsJsonObject());
+            }
+          }
+        }
+      }
+    };
+    logFeederClusterCache.getListenable().addListener(listener);
+    logFeederClusterCache.start();
+  }
+
+  private void createGlobalConfigNode(JsonArray globalConfigNode, String 
clusterName) {
+    String globalConfigNodePath = String.format("/%s/global", clusterName);
+    String data = InputConfigGson.gson.toJson(globalConfigNode);
+    
+    try {
+      if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) {
+        client.setData().forPath(globalConfigNodePath, data.getBytes());
+      } else {
+        
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath,
 data.getBytes());
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during global config node creation/update", e);
+    }
+  }
+
+  @Override
+  public OutputSolrProperties getOutputSolrProperties(String type) throws 
Exception {
+    String nodePath = String.format("/output/solr/%s", type);
+    ChildData currentData = outputCache.getCurrentData(nodePath);
+    return currentData == null ?
+        null :
+        gson.fromJson(new String(currentData.getData()), 
OutputSolrPropertiesImpl.class);
+  }
+
+  @Override
+  public void monitorOutputProperties(final List<? extends 
OutputConfigMonitor> outputConfigMonitors) throws Exception {
+    TreeCacheListener listener = new TreeCacheListener() {
+      public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
+        if (event.getType() != Type.NODE_UPDATED) {
+          return;
+        }
+        
+        LOG.info("Output config updated: " + event.getData().getPath());
+        for (OutputConfigMonitor monitor : outputConfigMonitors) {
+          String monitorPath = String.format("/output/%s/%s", 
monitor.getDestination(), monitor.getOutputType());
+          if (monitorPath.equals(event.getData().getPath())) {
+            String nodeData = new String(event.getData().getData());
+            OutputSolrProperties outputSolrProperties = 
gson.fromJson(nodeData, OutputSolrPropertiesImpl.class);
+            monitor.outputConfigChanged(outputSolrProperties);
+          }
+        }
+      }
+    };
+    outputCache.getListenable().addListener(listener);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
new file mode 100644
index 0000000..9973196
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfigServer;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
+import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter;
+import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
+import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonParser;
+
+public class LogSearchConfigServerZK extends LogSearchConfigZK implements 
LogSearchConfigServer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LogSearchConfigServerZK.class);
+
+  private TreeCache serverCache;
+
+  @Override
+  public void init(Map<String, String> properties) throws Exception {
+    super.init(properties);
+
+    if (client.checkExists().forPath("/") == null) {
+      client.create().creatingParentContainersIfNeeded().forPath("/");
+    }
+    if (client.checkExists().forPath("/output") == null) {
+      client.create().creatingParentContainersIfNeeded().forPath("/output");
+    }
+    serverCache = new TreeCache(client, "/");
+    serverCache.start();
+  }
+
+  @Override
+  public boolean inputConfigExists(String clusterName, String serviceName) 
throws Exception {
+    String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
+    return serverCache.getCurrentData(nodePath) != null;
+  }
+
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {
+    String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
+    client.setData().forPath(nodePath, inputConfig.getBytes());
+    LOG.info("Set input config for the service " + serviceName + " for cluster 
" + clusterName);
+  }
+
+  @Override
+  public List<String> getServices(String clusterName) {
+    String parentPath = String.format("/%s/input", clusterName);
+    Map<String, ChildData> serviceNodes = 
serverCache.getCurrentChildren(parentPath);
+    return serviceNodes == null ?
+        new ArrayList<>() :
+        new ArrayList<>(serviceNodes.keySet());
+  }
+
+  @Override
+  public String getGlobalConfigs(String clusterName) {
+    String globalConfigNodePath = String.format("/%s/global", clusterName);
+    return new 
String(serverCache.getCurrentData(globalConfigNodePath).getData());
+  }
+
+  @Override
+  public InputConfig getInputConfig(String clusterName, String serviceName) {
+    String globalConfigData = getGlobalConfigs(clusterName);
+    JsonArray globalConfigs = (JsonArray) new 
JsonParser().parse(globalConfigData);
+    InputAdapter.setGlobalConfigs(globalConfigs);
+    
+    ChildData childData = 
serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, 
serviceName));
+    return childData == null ? null : InputConfigGson.gson.fromJson(new 
String(childData.getData()), InputConfigImpl.class);
+  }
+
+  @Override
+  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {
+    for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) 
{
+      String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, 
e.getKey());
+      String logLevelFilterJson = gson.toJson(e.getValue());
+      String currentLogLevelFilterJson = new 
String(serverCache.getCurrentData(nodePath).getData());
+      if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) {
+        client.setData().forPath(nodePath, logLevelFilterJson.getBytes());
+        LOG.info("Set log level filter for the log " + e.getKey() + " for 
cluster " + clusterName);
+      }
+    }
+  }
+
+  @Override
+  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
+    String parentPath = String.format("/%s/loglevelfilter", clusterName);
+    Map<String, ChildData> logLevelFilterNodes = 
serverCache.getCurrentChildren(parentPath);
+    TreeMap<String, LogLevelFilter> filters = new TreeMap<>();
+    for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {
+      LogLevelFilter logLevelFilter = gson.fromJson(new 
String(e.getValue().getData()), LogLevelFilter.class);
+      filters.put(e.getKey(), logLevelFilter);
+    }
+    
+    LogLevelFilterMap logLevelFilters = new LogLevelFilterMap();
+    logLevelFilters.setFilter(filters);
+    return logLevelFilters;
+  }
+
+  @Override
+  public void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception {
+    String nodePath = String.format("/output/solr/%s", type);
+    String data = gson.toJson(outputSolrProperties);
+    if (outputCache.getCurrentData(nodePath) == null) {
+      
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath,
 data.getBytes());
+    } else {
+      client.setData().forPath(nodePath, data.getBytes());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 387d0c6..7037bef 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -22,33 +22,16 @@ package org.apache.ambari.logsearch.config.zookeeper;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
 import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
-import 
org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
-import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter;
-import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
-import 
org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
-import 
org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
-import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
-import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -57,13 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableSet;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
 
 public class LogSearchConfigZK implements LogSearchConfig {
   private static final Logger LOG = 
LoggerFactory.getLogger(LogSearchConfigZK.class);
@@ -71,7 +49,6 @@ public class LogSearchConfigZK implements LogSearchConfig {
   private static final int SESSION_TIMEOUT = 15000;
   private static final int CONNECTION_TIMEOUT = 30000;
   private static final String DEFAULT_ZK_ROOT = "/logsearch";
-  private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
   private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
 
   @LogSearchPropertyDescription(
@@ -99,16 +76,12 @@ public class LogSearchConfigZK implements LogSearchConfig {
   )
   private static final String ZK_ROOT_NODE_PROPERTY = 
"logsearch.config.zk_root";
 
-  private Map<String, String> properties;
-  private CuratorFramework client;
-  private Gson gson;
+  protected Map<String, String> properties;
+  protected CuratorFramework client;
+  protected TreeCache outputCache;
+  protected Gson gson;
 
-  private TreeCache serverCache;
-  private TreeCache logFeederClusterCache;
-  private TreeCache outputCache;
-
-  @Override
-  public void init(Component component, Map<String, String> properties, String 
clusterName) throws Exception {
+  public void init(Map<String, String> properties) throws Exception {
     this.properties = properties;
     
     String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, 
DEFAULT_ZK_ROOT);
@@ -124,39 +97,10 @@ public class LogSearchConfigZK implements LogSearchConfig {
     outputCache = new TreeCache(client, "/output");
     outputCache.start();
 
-    if (component == Component.SERVER) {
-      if (client.checkExists().forPath("/") == null) {
-        client.create().creatingParentContainersIfNeeded().forPath("/");
-      }
-      if (client.checkExists().forPath("/output") == null) {
-        client.create().creatingParentContainersIfNeeded().forPath("/output");
-      }
-      serverCache = new TreeCache(client, "/");
-      serverCache.start();
-    } else {
-      while (client.checkExists().forPath("/") == null) {
-        LOG.info("Root node is not present yet, going to sleep for " + 
WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
-        Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
-      }
-      logFeederClusterCache = new TreeCache(client, String.format("/%s", 
clusterName));
-    }
-    
     gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
   }
 
   @Override
-  public boolean inputConfigExistsLogFeeder(String serviceName) throws 
Exception {
-    String nodePath = String.format("/input/%s", serviceName);
-    return logFeederClusterCache.getCurrentData(nodePath) != null;
-  }
-
-  @Override
-  public boolean inputConfigExistsServer(String clusterName, String 
serviceName) throws Exception {
-    String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
-    return serverCache.getCurrentData(nodePath) != null;
-  }
-
-  @Override
   public void createInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {
     String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
     try {
@@ -168,159 +112,6 @@ public class LogSearchConfigZK implements LogSearchConfig 
{
   }
 
   @Override
-  public void setInputConfig(String clusterName, String serviceName, String 
inputConfig) throws Exception {
-    String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
-    client.setData().forPath(nodePath, inputConfig.getBytes());
-    LOG.info("Set input config for the service " + serviceName + " for cluster 
" + clusterName);
-  }
-
-  @Override
-  public void monitorInputConfigChanges(final InputConfigMonitor 
inputConfigMonitor,
-      final LogLevelFilterMonitor logLevelFilterMonitor, final String 
clusterName) throws Exception {
-    final JsonParser parser = new JsonParser();
-    final JsonArray globalConfigNode = new JsonArray();
-    for (String globalConfigJsonString : 
inputConfigMonitor.getGlobalConfigJsons()) {
-      JsonElement globalConfigJson = parser.parse(globalConfigJsonString);
-      globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global"));
-    }
-    
-    createGlobalConfigNode(globalConfigNode, clusterName);
-    
-    TreeCacheListener listener = new TreeCacheListener() {
-      private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, 
Type.NODE_UPDATED, Type.NODE_REMOVED);
-      
-      public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
-        if (!nodeEvents.contains(event.getType())) {
-          return;
-        }
-        
-        String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
-        String nodeData = new String(event.getData().getData());
-        Type eventType = event.getType();
-        
-        String configPathStab = String.format("/%s/", clusterName);
-        
-        if (event.getData().getPath().startsWith(configPathStab + "input/")) {
-          handleInputConfigChange(eventType, nodeName, nodeData);
-        } else if (event.getData().getPath().startsWith(configPathStab + 
"loglevelfilter/")) {
-          handleLogLevelFilterChange(eventType, nodeName, nodeData);
-        }
-      }
-
-      private void handleInputConfigChange(Type eventType, String nodeName, 
String nodeData) {
-        switch (eventType) {
-          case NODE_ADDED:
-            LOG.info("Node added under input ZK node: " + nodeName);
-            addInputs(nodeName, nodeData);
-            break;
-          case NODE_UPDATED:
-            LOG.info("Node updated under input ZK node: " + nodeName);
-            removeInputs(nodeName);
-            addInputs(nodeName, nodeData);
-            break;
-          case NODE_REMOVED:
-            LOG.info("Node removed from input ZK node: " + nodeName);
-            removeInputs(nodeName);
-            break;
-          default:
-            break;
-        }
-      }
-
-      private void removeInputs(String serviceName) {
-        inputConfigMonitor.removeInputs(serviceName);
-      }
-
-      private void addInputs(String serviceName, String inputConfig) {
-        try {
-          JsonElement inputConfigJson = parser.parse(inputConfig);
-          for (Map.Entry<String, JsonElement> typeEntry : 
inputConfigJson.getAsJsonObject().entrySet()) {
-            for (JsonElement e : typeEntry.getValue().getAsJsonArray()) {
-              for (JsonElement globalConfig : globalConfigNode) {
-                merge(globalConfig.getAsJsonObject(), e.getAsJsonObject());
-              }
-            }
-          }
-          
-          inputConfigMonitor.loadInputConfigs(serviceName, 
InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
-        } catch (Exception e) {
-          LOG.error("Could not load input configuration for service " + 
serviceName + ":\n" + inputConfig, e);
-        }
-      }
-
-      private void handleLogLevelFilterChange(Type eventType, String nodeName, 
String nodeData) {
-        switch (eventType) {
-          case NODE_ADDED:
-          case NODE_UPDATED:
-            LOG.info("Node added/updated under loglevelfilter ZK node: " + 
nodeName);
-            LogLevelFilter logLevelFilter = gson.fromJson(nodeData, 
LogLevelFilter.class);
-            logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
-            break;
-          case NODE_REMOVED:
-            LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
-            logLevelFilterMonitor.removeLogLevelFilter(nodeName);
-            break;
-          default:
-            break;
-        }
-      }
-
-      private void merge(JsonObject source, JsonObject target) {
-        for (Map.Entry<String, JsonElement> e : source.entrySet()) {
-          if (!target.has(e.getKey())) {
-            target.add(e.getKey(), e.getValue());
-          } else {
-            if (e.getValue().isJsonObject()) {
-              JsonObject valueJson = (JsonObject)e.getValue();
-              merge(valueJson, target.get(e.getKey()).getAsJsonObject());
-            }
-          }
-        }
-      }
-    };
-    logFeederClusterCache.getListenable().addListener(listener);
-    logFeederClusterCache.start();
-  }
-
-  private void createGlobalConfigNode(JsonArray globalConfigNode, String 
clusterName) {
-    String globalConfigNodePath = String.format("/%s/global", clusterName);
-    String data = InputConfigGson.gson.toJson(globalConfigNode);
-    
-    try {
-      if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) {
-        client.setData().forPath(globalConfigNodePath, data.getBytes());
-      } else {
-        
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath,
 data.getBytes());
-      }
-    } catch (Exception e) {
-      LOG.warn("Exception during global config node creation/update", e);
-    }
-  }
-
-  @Override
-  public List<String> getServices(String clusterName) {
-    String parentPath = String.format("/%s/input", clusterName);
-    Map<String, ChildData> serviceNodes = 
serverCache.getCurrentChildren(parentPath);
-    return new ArrayList<String>(serviceNodes.keySet());
-  }
-
-  @Override
-  public String getGlobalConfigs(String clusterName) {
-    String globalConfigNodePath = String.format("/%s/global", clusterName);
-    return new 
String(serverCache.getCurrentData(globalConfigNodePath).getData());
-  }
-
-  @Override
-  public InputConfig getInputConfig(String clusterName, String serviceName) {
-    String globalConfigData = getGlobalConfigs(clusterName);
-    JsonArray globalConfigs = (JsonArray) new 
JsonParser().parse(globalConfigData);
-    InputAdapter.setGlobalConfigs(globalConfigs);
-    
-    ChildData childData = 
serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, 
serviceName));
-    return childData == null ? null : InputConfigGson.gson.fromJson(new 
String(childData.getData()), InputConfigImpl.class);
-  }
-
-  @Override
   public void createLogLevelFilter(String clusterName, String logId, 
LogLevelFilter filter) throws Exception {
     String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, 
logId);
     String logLevelFilterJson = gson.toJson(filter);
@@ -332,35 +123,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
     }
   }
 
-  @Override
-  public void setLogLevelFilters(String clusterName, LogLevelFilterMap 
filters) throws Exception {
-    for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) 
{
-      String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, 
e.getKey());
-      String logLevelFilterJson = gson.toJson(e.getValue());
-      String currentLogLevelFilterJson = new 
String(serverCache.getCurrentData(nodePath).getData());
-      if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) {
-        client.setData().forPath(nodePath, logLevelFilterJson.getBytes());
-        LOG.info("Set log level filter for the log " + e.getKey() + " for 
cluster " + clusterName);
-      }
-    }
-  }
-
-  @Override
-  public LogLevelFilterMap getLogLevelFilters(String clusterName) {
-    String parentPath = String.format("/%s/loglevelfilter", clusterName);
-    Map<String, ChildData> logLevelFilterNodes = 
serverCache.getCurrentChildren(parentPath);
-    TreeMap<String, LogLevelFilter> filters = new TreeMap<>();
-    for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {
-      LogLevelFilter logLevelFilter = gson.fromJson(new 
String(e.getValue().getData()), LogLevelFilter.class);
-      filters.put(e.getKey(), logLevelFilter);
-    }
-    
-    LogLevelFilterMap logLevelFilters = new LogLevelFilterMap();
-    logLevelFilters.setFilter(filters);
-    return logLevelFilters;
-  }
-
-  private List<ACL> getAcls() {
+  protected List<ACL> getAcls() {
     String aclStr = properties.get(ZK_ACLS_PROPERTY);
     if (StringUtils.isBlank(aclStr)) {
       return ZooDefs.Ids.OPEN_ACL_UNSAFE;
@@ -404,48 +167,6 @@ public class LogSearchConfigZK implements LogSearchConfig {
   }
 
   @Override
-  public void saveOutputSolrProperties(String type, OutputSolrProperties 
outputSolrProperties) throws Exception {
-    String nodePath = String.format("/output/solr/%s", type);
-    String data = gson.toJson(outputSolrProperties);
-    if (outputCache.getCurrentData(nodePath) == null) {
-      
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath,
 data.getBytes());
-    } else {
-      client.setData().forPath(nodePath, data.getBytes());
-    }
-  }
-
-  @Override
-  public OutputSolrProperties getOutputSolrProperties(String type) throws 
Exception {
-    String nodePath = String.format("/output/solr/%s", type);
-    ChildData currentData = outputCache.getCurrentData(nodePath);
-    return currentData == null ?
-        null :
-        gson.fromJson(new String(currentData.getData()), 
OutputSolrPropertiesImpl.class);
-  }
-
-  @Override
-  public void monitorOutputProperties(final List<? extends 
OutputConfigMonitor> outputConfigMonitors) throws Exception {
-    TreeCacheListener listener = new TreeCacheListener() {
-      public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
-        if (event.getType() != Type.NODE_UPDATED) {
-          return;
-        }
-        
-        LOG.info("Output config updated: " + event.getData().getPath());
-        for (OutputConfigMonitor monitor : outputConfigMonitors) {
-          String monitorPath = String.format("/output/%s/%s", 
monitor.getDestination(), monitor.getOutputType());
-          if (monitorPath.equals(event.getData().getPath())) {
-            String nodeData = new String(event.getData().getData());
-            OutputSolrProperties outputSolrProperties = 
gson.fromJson(nodeData, OutputSolrPropertiesImpl.class);
-            monitor.outputConfigChanged(outputSolrProperties);
-          }
-        }
-      }
-    };
-    outputCache.getListenable().addListener(listener);
-  }
-
-  @Override
   public void close() {
     LOG.info("Closing ZooKeeper Connection");
     client.close();

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 2461819..5114743 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -27,10 +27,9 @@ import java.util.Map;
 
 import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.common.LogEntryParseTester;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
-import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
 import org.apache.commons.io.FileUtils;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
@@ -54,7 +53,7 @@ public class LogFeeder {
   private final LogFeederCommandLine cli;
   
   private ConfigHandler configHandler;
-  private LogSearchConfig config;
+  private LogSearchConfigLogFeeder config;
   
   private MetricsManager metricsManager = new MetricsManager();
 
@@ -80,8 +79,8 @@ public class LogFeeder {
 
     SSLUtil.ensureStorePasswords();
     
-    config = 
LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,Maps.fromProperties(LogFeederPropertiesUtil.getProperties()),
-        LogFeederPropertiesUtil.getClusterName(), LogSearchConfigZK.class);
+    config = 
LogSearchConfigFactory.createLogSearchConfigLogFeeder(Maps.fromProperties(LogFeederPropertiesUtil.getProperties()),
+        LogFeederPropertiesUtil.getClusterName(), 
LogSearchConfigLogFeederZK.class);
     configHandler = new ConfigHandler(config);
     configHandler.init();
     LogLevelFilterHandler.init(config);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2d1ac668/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 11df9dc..243b344 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -49,7 +49,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
-import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
@@ -65,7 +65,7 @@ import com.google.gson.reflect.TypeToken;
 public class ConfigHandler implements InputConfigMonitor {
   private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
 
-  private final LogSearchConfig logSearchConfig;
+  private final LogSearchConfigLogFeeder logSearchConfig;
   
   private final OutputManager outputManager = new OutputManager();
   private final InputManager inputManager = new InputManager();
@@ -79,7 +79,7 @@ public class ConfigHandler implements InputConfigMonitor {
   
   private boolean simulateMode = false;
   
-  public ConfigHandler(LogSearchConfig logSearchConfig) {
+  public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) {
     this.logSearchConfig = logSearchConfig;
   }
   

Reply via email to