Repository: incubator-eagle
Updated Branches:
  refs/heads/master 0cd117c1f -> c57f86dd1


[EAGLE-719] fix configuration bug in applications

Author: anyway1021 <m...@apache.org>

Closes #598 from anyway1021/EAGLE-719.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c57f86dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c57f86dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c57f86dd

Branch: refs/heads/master
Commit: c57f86dd186a87ec749420e50fdf3180959cb16f
Parents: 0cd117c
Author: anyway1021 <m...@apache.org>
Authored: Wed Nov 2 17:51:39 2016 +0800
Committer: anyway1021 <m...@apache.org>
Committed: Wed Nov 2 17:51:39 2016 +0800

----------------------------------------------------------------------
 .../hadoop/queue/HadoopQueueRunningApp.java     |  2 +-
 .../queue/HadoopQueueRunningAppConfig.java      | 17 +------
 .../jpm/aggregation/AggregationApplication.java |  6 +--
 .../jpm/aggregation/AggregationConfig.java      | 19 +++-----
 .../jpm/aggregation/mr/MRMetricAggregator.java  | 26 ++++++-----
 .../mr/MRMetricsAggregateContainer.java         | 19 +++++---
 .../jpm/aggregation/storm/AggregationBolt.java  | 15 +++----
 .../jpm/aggregation/storm/AggregationSpout.java | 20 ++++-----
 .../jpm/mr/history/MRHistoryJobApplication.java |  4 +-
 .../jpm/mr/history/MRHistoryJobConfig.java      | 18 ++------
 .../crawler/DefaultJHFInputStreamCallback.java  | 11 +++--
 .../history/crawler/JHFCrawlerDriverImpl.java   |  6 +--
 .../metrics/JobCountMetricsGenerator.java       | 18 +++++---
 .../metrics/JobCounterMetricsGenerator.java     | 15 ++++---
 .../mr/history/parser/JHFEventReaderBase.java   | 10 +++--
 .../mr/history/parser/JHFMRVer2EventReader.java |  5 ++-
 .../jpm/mr/history/parser/JHFParserFactory.java | 16 ++++---
 ...JobConfigurationCreationServiceListener.java | 16 ++++---
 .../JobEntityCreationEagleServiceListener.java  | 21 +++++----
 .../parser/TaskAttemptCounterListener.java      | 16 ++++---
 .../mr/history/parser/TaskFailureListener.java  | 16 ++++---
 .../jpm/mr/history/storm/JobHistorySpout.java   | 47 +++++++++++---------
 .../jpm/mr/running/MRRunningJobApplication.java |  2 +-
 .../jpm/mr/running/MRRunningJobConfig.java      | 14 +++---
 .../jpm/spark/history/SparkHistoryJobApp.java   |  2 +-
 .../spark/history/SparkHistoryJobAppConfig.java | 10 ++---
 .../apache/eagle/topology/TopologyCheckApp.java |  2 +-
 .../eagle/topology/TopologyCheckAppConfig.java  | 16 ++++---
 .../topology/storm/TopologyDataPersistBolt.java |  2 +-
 .../topology/TestHbaseTopologyCrawler.java      |  2 +-
 .../eagle/topology/TestHdfsTopologyCrawler.java |  2 +-
 .../eagle/topology/TestMRTopologyCrawler.java   |  2 +-
 32 files changed, 206 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
index 7a853a1..77dc0cb 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -27,7 +27,7 @@ import 
org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
 
 public class HadoopQueueRunningApp extends StormApplication {
     public StormTopology execute(Config config, StormEnvironment environment) {
-        HadoopQueueRunningAppConfig appConfig = 
HadoopQueueRunningAppConfig.getInstance(config);
+        HadoopQueueRunningAppConfig appConfig = new 
HadoopQueueRunningAppConfig(config);
 
         IRichSpout spout = new HadoopQueueRunningSpout(appConfig);
         HadoopQueueMetricPersistBolt bolt = new 
HadoopQueueMetricPersistBolt(appConfig);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
index e370ce9..d398028 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
@@ -21,7 +21,6 @@ import com.typesafe.config.Config;
 import java.io.Serializable;
 
 public class HadoopQueueRunningAppConfig implements Serializable {
-    public static final HadoopQueueRunningAppConfig instance = new 
HadoopQueueRunningAppConfig();
 
     public Topology topology;
     public DataSourceConfig dataSourceConfig;
@@ -29,11 +28,11 @@ public class HadoopQueueRunningAppConfig implements 
Serializable {
 
     private Config config = null;
 
-    private HadoopQueueRunningAppConfig() {
+    public HadoopQueueRunningAppConfig(Config config) {
         this.topology = new Topology();
         this.dataSourceConfig = new DataSourceConfig();
         this.eagleProps = new EagleProps();
-        this.config = null;
+        init(config);
     }
 
     public static class Topology implements Serializable {
@@ -61,18 +60,6 @@ public class HadoopQueueRunningAppConfig implements 
Serializable {
         }
     }
 
-    public static HadoopQueueRunningAppConfig getInstance(Config config) {
-        if (config != null && instance.config == null) {
-            synchronized (instance) {
-                if (instance.config == null) {
-                    instance.init(config);
-                }
-                return instance;
-            }
-        }
-        return instance;
-    }
-
     public Config getConfig() {
         return config;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
index 750c9ab..ecddbce 100644
--- 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
+++ 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
@@ -54,17 +54,17 @@ public class AggregationApplication extends 
StormApplication {
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrHistoryAggregationSpout";
         String boltName = "mrHistoryAggregationBolt";
-        AggregationConfig aggregationConfig = 
AggregationConfig.getInstance(config);
+        AggregationConfig aggregationConfig = 
AggregationConfig.newInstance(config);
         int tasks = aggregationConfig.getConfig().getInt("stormConfig." + 
spoutName + "Tasks");
         topologyBuilder.setSpout(
             spoutName,
-            new AggregationSpout(config, new 
MRMetricsAggregateContainer(metrics)),
+            new AggregationSpout(aggregationConfig, new 
MRMetricsAggregateContainer(metrics, aggregationConfig)),
             tasks
         ).setNumTasks(tasks);
 
         tasks = aggregationConfig.getConfig().getInt("stormConfig." + boltName 
+ "Tasks");
         topologyBuilder.setBolt(boltName,
-            new AggregationBolt(config, new 
MRMetricsAggregateContainer(metrics)),
+            new AggregationBolt(aggregationConfig.getStormConfig(), new 
MRMetricsAggregateContainer(metrics, aggregationConfig)),
             tasks).setNumTasks(tasks).shuffleGrouping(spoutName);
 
         return topologyBuilder.createTopology();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
index e17c5b6..b4f56bf 100644
--- 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
+++ 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
@@ -72,25 +72,15 @@ public class AggregationConfig implements Serializable {
         public String password;
     }
 
-    private static AggregationConfig manager = new AggregationConfig();
-
-    private AggregationConfig() {
+    private AggregationConfig(Config config) {
         this.zkStateConfig = new ZKStateConfig();
         this.stormConfig = new StormConfig();
         this.eagleServiceConfig = new EagleServiceConfig();
-        this.config = null;
+        init(config);
     }
 
-    public static AggregationConfig getInstance(Config config) {
-        if (config != null && manager.config == null) {
-            manager.init(config);
-        }
-
-        return manager;
-    }
-
-    public static AggregationConfig get() {
-        return getInstance(null);
+    public static AggregationConfig newInstance(Config config) {
+        return new AggregationConfig(config);
     }
 
     /**
@@ -98,6 +88,7 @@ public class AggregationConfig implements Serializable {
      */
     private void init(Config config) {
         this.config = config;
+
         //parse stormConfig
         this.stormConfig.site = config.getString("siteId");
         this.stormConfig.aggregationDuration = 
config.getLong("stormConfig.aggregationDuration");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
index 4941552..f8840b2 100644
--- 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
+++ 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.*;
 
+import static 
org.apache.eagle.jpm.aggregation.AggregationConfig.EagleServiceConfig;
+
 public class MRMetricAggregator implements MetricAggregator, Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(MRMetricAggregator.class);
 
@@ -39,25 +41,29 @@ public class MRMetricAggregator implements 
MetricAggregator, Serializable {
     private List<List<String>> aggregateColumns;
     //key is AggregatorColumns, value is a map(key is timeStamp, value is 
metric value)
     private Map<AggregatorColumns, Map<Long, Long>> aggregateValues;
+    private AggregationConfig appConfig;
+    private EagleServiceConfig eagleServiceConfig;
 
-    public MRMetricAggregator(String metric, List<List<String>> 
aggregateColumns) {
+    public MRMetricAggregator(String metric, List<List<String>> 
aggregateColumns, AggregationConfig appConfig) {
         this.metric = metric;
         this.aggregateColumns = aggregateColumns;
         this.aggregateValues = new TreeMap<>();
+        this.appConfig = appConfig;
+        eagleServiceConfig = appConfig.getEagleServiceConfig();
     }
 
     @Override
     public boolean aggregate(long startTime, long endTime) {
         LOG.info("start to aggregate {} from {} to {}", metric, startTime, 
endTime);
         IEagleServiceClient client = new EagleServiceClientImpl(
-            AggregationConfig.get().getEagleServiceConfig().eagleServiceHost,
-            AggregationConfig.get().getEagleServiceConfig().eagleServicePort,
-            AggregationConfig.get().getEagleServiceConfig().username,
-            AggregationConfig.get().getEagleServiceConfig().password);
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
 
         String query = String.format("%s[@site=\"%s\"]{*}",
             Constants.GENERIC_METRIC_SERVICE,
-            AggregationConfig.get().getStormConfig().site);
+            appConfig.getStormConfig().site);
 
         GenericServiceAPIResponseEntity response;
         try {
@@ -115,10 +121,10 @@ public class MRMetricAggregator implements 
MetricAggregator, Serializable {
 
     private boolean flush() {
         IEagleServiceClient client = new EagleServiceClientImpl(
-            AggregationConfig.get().getEagleServiceConfig().eagleServiceHost,
-            AggregationConfig.get().getEagleServiceConfig().eagleServicePort,
-            AggregationConfig.get().getEagleServiceConfig().username,
-            AggregationConfig.get().getEagleServiceConfig().password);
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
 
         List<GenericMetricEntity> entities = new ArrayList<>();
         for (AggregatorColumns aggregatorColumns : 
this.aggregateValues.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
index 087e44c..00d0457 100644
--- 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
+++ 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
@@ -34,31 +34,36 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.eagle.jpm.aggregation.AggregationConfig.EagleServiceConfig;
+
 public class MRMetricsAggregateContainer implements MetricsAggregateContainer, 
Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(MRMetricsAggregateContainer.class);
 
     private Map<String, MetricAggregator> metricAggregators;
+    private AggregationConfig appConfig;
 
-    public MRMetricsAggregateContainer(Map<String, List<List<String>>> 
metrics) {
+    public MRMetricsAggregateContainer(Map<String, List<List<String>>> 
metrics, AggregationConfig appConfig) {
         this.metricAggregators = new HashMap<>();
         //metric name, aggregate columns
         for (String metric : metrics.keySet()) {
-            this.metricAggregators.put(metric, new MRMetricAggregator(metric, 
metrics.get(metric)));
+            this.metricAggregators.put(metric, new MRMetricAggregator(metric, 
metrics.get(metric), appConfig));
         }
+        this.appConfig = appConfig;
     }
 
     @Override
     public long fetchLatestJobProcessTime() {
         try {
+            EagleServiceConfig eagleServiceConfig = 
appConfig.getEagleServiceConfig();
             IEagleServiceClient client = new EagleServiceClientImpl(
-                
AggregationConfig.get().getEagleServiceConfig().eagleServiceHost,
-                
AggregationConfig.get().getEagleServiceConfig().eagleServicePort,
-                AggregationConfig.get().getEagleServiceConfig().username,
-                AggregationConfig.get().getEagleServiceConfig().password);
+                    eagleServiceConfig.eagleServiceHost,
+                    eagleServiceConfig.eagleServicePort,
+                    eagleServiceConfig.username,
+                    eagleServiceConfig.password);
 
             String query = 
String.format("%s[@site=\"%s\"]<@site>{max(currentTimeStamp)}",
                 Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME,
-                AggregationConfig.get().getStormConfig().site);
+                appConfig.getStormConfig().site);
 
             GenericServiceAPIResponseEntity response = client
                 .search(query)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java
 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java
index f4521cd..bb2e3de 100644
--- 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java
+++ 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java
@@ -18,33 +18,32 @@
 
 package org.apache.eagle.jpm.aggregation.storm;
 
-import org.apache.eagle.jpm.aggregation.AggregationConfig;
-import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
+import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static org.apache.eagle.jpm.aggregation.AggregationConfig.StormConfig;
+
 public class AggregationBolt extends BaseRichBolt {
     private static final Logger LOG = 
LoggerFactory.getLogger(AggregationBolt.class);
-    private Config config;
+    private StormConfig stormConfig;
     private OutputCollector collector;
     private MetricsAggregateContainer metricsAggregateContainer;
 
-    public AggregationBolt(Config config, MetricsAggregateContainer 
metricsAggregateContainer) {
-        this.config = config;
+    public AggregationBolt(StormConfig stormConfig, MetricsAggregateContainer 
metricsAggregateContainer) {
+        this.stormConfig = stormConfig;
         this.metricsAggregateContainer = metricsAggregateContainer;
     }
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-        AggregationConfig.getInstance(config);
         this.collector = outputCollector;
     }
 
@@ -52,7 +51,7 @@ public class AggregationBolt extends BaseRichBolt {
     public void execute(Tuple tuple) {
         Long startTime = tuple.getLongByField("startTime");
         LOG.info("get startTime {}", startTime);
-        Long endTime = startTime + 
AggregationConfig.get().getStormConfig().aggregationDuration * 1000;
+        Long endTime = startTime + stormConfig.aggregationDuration * 1000;
 
         if (metricsAggregateContainer.aggregate(startTime, endTime)) {
             collector.ack(tuple);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
index 8a2f06a..3ee0519 100644
--- 
a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
+++ 
b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
@@ -18,17 +18,16 @@
 
 package org.apache.eagle.jpm.aggregation.storm;
 
-import org.apache.eagle.jpm.aggregation.AggregationConfig;
-import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer;
-import org.apache.eagle.jpm.aggregation.state.AggregationTimeManager;
-import org.apache.eagle.jpm.util.Utils;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import com.typesafe.config.Config;
+import org.apache.eagle.jpm.aggregation.AggregationConfig;
+import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer;
+import org.apache.eagle.jpm.aggregation.state.AggregationTimeManager;
+import org.apache.eagle.jpm.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,23 +40,22 @@ public class AggregationSpout extends BaseRichSpout {
     private static final Long MAX_WAIT_TIME = 12 * 60 * 60000L;//12 hours
     private static final Long MAX_SAFE_TIME = 6 * 60 * 60000L;//6 hours
 
-    private Config config;
+    private AggregationConfig appConfig;
     MetricsAggregateContainer jobProcessTime;
 
     private SpoutOutputCollector collector;
     private Set<Long> processStartTime;
     private Long lastUpdateTime;
 
-    public AggregationSpout(Config config, MetricsAggregateContainer 
jobProcessTime) {
-        this.config = config;
+    public AggregationSpout(AggregationConfig appConfig, 
MetricsAggregateContainer jobProcessTime) {
+        this.appConfig = appConfig;
         this.jobProcessTime = jobProcessTime;
         this.processStartTime = new HashSet<>();
     }
 
     @Override
     public void open(Map conf, TopologyContext context, final 
SpoutOutputCollector collector) {
-        AggregationConfig.getInstance(config);
-        
AggregationTimeManager.instance().init(AggregationConfig.get().getZkStateConfig());
+        AggregationTimeManager.instance().init(appConfig.getZkStateConfig());
         this.collector = collector;
     }
 
@@ -90,7 +88,7 @@ public class AggregationSpout extends BaseRichSpout {
             for (Long startTime = lastUpdateTime; startTime < lastUpdateTime + 
MAX_SAFE_TIME;) {
                 collector.emit(new Values(startTime), startTime);
                 this.processStartTime.add(startTime);
-                startTime += 
AggregationConfig.get().getStormConfig().aggregationDuration * 1000;
+                startTime += appConfig.getStormConfig().aggregationDuration * 
1000;
             }
         } catch (Exception e) {
             LOG.warn("{}", e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index aaf65ac..e4e206f 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -37,7 +37,7 @@ public class MRHistoryJobApplication extends StormApplication 
{
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         //1. trigger init conf
-        MRHistoryJobConfig appConfig = MRHistoryJobConfig.getInstance(config);
+        MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(config);
         com.typesafe.config.Config jhfAppConf = appConfig.getConfig();
 
         //2. init JobHistoryContentFilter
@@ -65,7 +65,7 @@ public class MRHistoryJobApplication extends StormApplication 
{
         int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks");
         topologyBuilder.setSpout(
                 spoutName,
-                new JobHistorySpout(filter, config),
+                new JobHistorySpout(filter, appConfig),
                 tasks
         ).setNumTasks(tasks);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index 496aa77..b0c934a 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -85,31 +85,21 @@ public class MRHistoryJobConfig implements Serializable {
         public int readTimeoutSeconds;
     }
 
-    private static MRHistoryJobConfig manager = new MRHistoryJobConfig();
-
     /**
      * As this is singleton object and constructed while this class is being 
initialized,
      * so any exception within this constructor will be wrapped with 
java.lang.ExceptionInInitializerError.
      * And this is unrecoverable and hard to troubleshooting.
      */
-    private MRHistoryJobConfig() {
+    private MRHistoryJobConfig(Config config) {
         this.zkStateConfig = new ZKStateConfig();
         this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
         this.jobHistoryEndpointConfig.hdfs = new HashMap<>();
         this.eagleServiceConfig = new EagleServiceConfig();
-        this.config = null;
-    }
-
-    public static MRHistoryJobConfig getInstance(Config config) {
-        if (config != null && manager.config == null) {
-            manager.init(config);
-        }
-
-        return manager;
+        init(config);
     }
 
-    public static MRHistoryJobConfig get() {
-        return getInstance(null);
+    public static MRHistoryJobConfig newInstance(Config config) {
+        return new MRHistoryJobConfig(config);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index af83985..2b5e15c 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -28,16 +28,19 @@ import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
+
 public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class);
 
-
     private JobHistoryContentFilter filter;
     private EagleOutputCollector collector;
+    private MRHistoryJobConfig appConfig;
 
-    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, 
EagleOutputCollector eagleCollector) {
+    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, 
EagleOutputCollector eagleCollector, MRHistoryJobConfig appConfig) {
         this.filter = filter;
         this.collector = eagleCollector;
+        this.appConfig = appConfig;
     }
 
     @Override
@@ -45,7 +48,7 @@ public class DefaultJHFInputStreamCallback implements 
JHFInputStreamCallback {
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", 
MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site);
+                put("site", appConfig.getJobHistoryEndpointConfig().site);
             }
         };
 
@@ -54,7 +57,7 @@ public class DefaultJHFInputStreamCallback implements 
JHFInputStreamCallback {
             jobFileInputStream.close();
         } else {
             //get parser and parse, do not need to emit data now
-            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, 
filter, this.collector);
+            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, 
filter, this.collector, appConfig);
             parser.parse(jobFileInputStream);
             jobFileInputStream.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index ef38b43..9c9374d 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -61,13 +61,13 @@ public class JHFCrawlerDriverImpl implements 
JHFCrawlerDriver {
     private JobCountMetricsGenerator jobCountMetricsGenerator;
 
     public JHFCrawlerDriverImpl(JHFInputStreamCallback reader,
-                                JobHistoryLCM historyLCM, JobIdFilter 
jobFilter, int partitionId) throws Exception {
+                                JobHistoryLCM historyLCM, JobIdFilter 
jobFilter, int partitionId, MRHistoryJobConfig appConfig) throws Exception {
         this.reader = reader;
         jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
         this.partitionId = partitionId;
         this.jobFilter = jobFilter;
-        timeZone = 
TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone);
-        jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone);
+        timeZone = 
TimeZone.getTimeZone(appConfig.getJobHistoryEndpointConfig().timeZone);
+        jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone, 
appConfig);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
index f74f77a..573d3cb 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -32,13 +32,18 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig;
+
 public class JobCountMetricsGenerator {
     private static final Logger LOG = 
LoggerFactory.getLogger(JobCountMetricsGenerator.class);
 
     private TimeZone timeZone;
 
-    public JobCountMetricsGenerator(TimeZone timeZone) {
+    private MRHistoryJobConfig appConfig;
+
+    public JobCountMetricsGenerator(TimeZone timeZone, MRHistoryJobConfig 
appConfig) {
         this.timeZone = timeZone;
+        this.appConfig = appConfig;
     }
 
     public void flush(String date, int year, int month, int day) throws 
Exception {
@@ -58,11 +63,12 @@ public class JobCountMetricsGenerator {
         }
         int failed = total - killed - succeeded;
 
+        EagleServiceConfig eagleServiceConfig = 
appConfig.getEagleServiceConfig();
         final IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
 
         GregorianCalendar cal = new GregorianCalendar(year, month, day);
@@ -88,7 +94,7 @@ public class JobCountMetricsGenerator {
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", 
MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site);
+                put("site", appConfig.getJobHistoryEndpointConfig().site);
                 put(MRJobTagName.JOB_STATUS.toString(), state);
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
index 6291b37..6020a3b 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
@@ -34,6 +34,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig;
+
 public class JobCounterMetricsGenerator {
     private static final Logger LOG = 
LoggerFactory.getLogger(JobCounterMetricsGenerator.class);
     private static final int BATCH_SIZE = 1000;
@@ -45,8 +47,11 @@ public class JobCounterMetricsGenerator {
     private List<GenericMetricEntity> lastEntitiesBatch;
     private Map<String, String> baseTags;
 
-    public JobCounterMetricsGenerator() {
+    private EagleServiceConfig eagleServiceConfig;
+
+    public JobCounterMetricsGenerator(EagleServiceConfig eagleServiceConfig) {
         this.lastEntitiesBatch = null;
+        this.eagleServiceConfig = eagleServiceConfig;
     }
 
     public void setBaseTags(Map<String, String> tags) {
@@ -115,10 +120,10 @@ public class JobCounterMetricsGenerator {
         }
 
         IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         for (List<GenericMetricEntity> entities : metricEntities) {
             LOG.info("start flushing entities of total number " + 
entities.size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 8b35a56..e48370d 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -79,6 +79,8 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
 
     private JobCounterMetricsGenerator jobCounterMetricsGenerator;
 
+    private MRHistoryJobConfig appConfig;
+
     /**
      * baseTags stores the basic tag name values which might be used for 
persisting various entities.
      * baseTags includes: site and jobName
@@ -86,7 +88,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
      *
      * @param baseTags
      */
-    public JHFEventReaderBase(Map<String, String> baseTags, Configuration 
configuration, JobHistoryContentFilter filter) {
+    public JHFEventReaderBase(Map<String, String> baseTags, Configuration 
configuration, JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) {
         this.filter = filter;
 
         this.baseTags = baseTags;
@@ -119,7 +121,9 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
         }
         this.sumMapTaskDuration = 0L;
         this.sumReduceTaskDuration = 0L;
-        this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator();
+
+        this.appConfig = appConfig;
+        this.jobCounterMetricsGenerator = new 
JobCounterMetricsGenerator(appConfig.getEagleServiceConfig());
     }
 
     public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
@@ -149,7 +153,7 @@ public abstract class JHFEventReaderBase extends 
JobEntityCreationPublisher impl
     }
 
     private String buildJobTrackingUrl(String jobId) {
-        String jobTrackingUrlBase = 
MRHistoryJobConfig.getInstance(null).getJobHistoryEndpointConfig().mrHistoryServerUrl
 + "/jobhistory/job/";
+        String jobTrackingUrlBase = 
appConfig.getJobHistoryEndpointConfig().mrHistoryServerUrl + "/jobhistory/job/";
         try {
             URI oldUri = new URI(jobTrackingUrlBase);
             URI resolved = oldUri.resolve(jobId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 5324c02..6e0e3aa 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -42,8 +43,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
      *
      * @throws IOException
      */
-    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration 
configuration, JobHistoryContentFilter filter) {
-        super(baseTags, configuration, filter);
+    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration 
configuration, JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) {
+        super(baseTags, configuration, filter, appConfig);
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index fd5483b..784d107 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.hadoop.conf.Configuration;
@@ -33,12 +34,15 @@ public class JHFParserFactory {
     public static JHFParserBase getParser(Map<String, String> baseTags,
                                           Configuration configuration,
                                           JobHistoryContentFilter filter,
-                                          EagleOutputCollector 
outputCollector) {
-        JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, 
configuration, filter);
-        reader2.addListener(new 
JobEntityCreationEagleServiceListener(outputCollector));
-        reader2.addListener(new TaskFailureListener());
-        reader2.addListener(new TaskAttemptCounterListener());
-        reader2.addListener(new JobConfigurationCreationServiceListener());
+                                          EagleOutputCollector outputCollector,
+                                          MRHistoryJobConfig appConfig) {
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = 
appConfig.getEagleServiceConfig();
+
+        JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, 
configuration, filter, appConfig);
+        reader2.addListener(new 
JobEntityCreationEagleServiceListener(outputCollector, appConfig));
+        reader2.addListener(new TaskFailureListener(eagleServiceConfig));
+        reader2.addListener(new 
TaskAttemptCounterListener(eagleServiceConfig));
+        reader2.addListener(new 
JobConfigurationCreationServiceListener(eagleServiceConfig));
 
         reader2.register(new JobEntityLifecycleAggregator());
         JHFParserBase parser = new JHFMRVer2Parser(reader2);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 28020b0..96f2b3b 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -29,12 +29,16 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig;
+
 public class JobConfigurationCreationServiceListener implements 
HistoryJobEntityLifecycleListener {
     private static final Logger logger = 
LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
     private static final int MAX_RETRY_TIMES = 3;
     private JobConfigurationAPIEntity jobConfigurationEntity;
+    private EagleServiceConfig eagleServiceConfig;
 
-    public JobConfigurationCreationServiceListener() {
+    public JobConfigurationCreationServiceListener(EagleServiceConfig 
eagleServiceConfig) {
+        this.eagleServiceConfig = eagleServiceConfig;
     }
 
     @Override
@@ -54,12 +58,12 @@ public class JobConfigurationCreationServiceListener 
implements HistoryJobEntity
     @Override
     public void flush() throws Exception {
         IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
-        
client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds
 * 1000);
+        
client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 
1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
         list.add(jobConfigurationEntity);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 2c7a2b1..902590b 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -49,18 +49,20 @@ public class JobEntityCreationEagleServiceListener 
implements HistoryJobEntityCr
     private JobExecutionMetricsCreationListener 
jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
     private TimeZone timeZone;
     private EagleOutputCollector collector;
+    private MRHistoryJobConfig appConfig;
 
-    public JobEntityCreationEagleServiceListener(EagleOutputCollector 
collector) {
-        this(BATCH_SIZE, collector);
+    public JobEntityCreationEagleServiceListener(EagleOutputCollector 
collector, MRHistoryJobConfig appConfig) {
+        this(BATCH_SIZE, collector, appConfig);
     }
 
-    public JobEntityCreationEagleServiceListener(int batchSize, 
EagleOutputCollector collector) {
+    public JobEntityCreationEagleServiceListener(int batchSize, 
EagleOutputCollector collector, MRHistoryJobConfig appConfig) {
         if (batchSize <= 0) {
             throw new IllegalArgumentException("batchSize must be greater than 
0 when it is provided");
         }
         this.batchSize = batchSize;
         this.collector = collector;
-        timeZone = 
TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone);
+        this.appConfig = appConfig;
+        timeZone = 
TimeZone.getTimeZone(appConfig.getJobHistoryEndpointConfig().timeZone);
     }
 
     @Override
@@ -86,13 +88,14 @@ public class JobEntityCreationEagleServiceListener 
implements HistoryJobEntityCr
      */
     @Override
     public void flush() throws Exception {
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = 
appConfig.getEagleServiceConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
-        
client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds
 * 1000);
+        
client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 
1000);
         logger.info("start flushing entities of total number " + list.size());
         List<GenericMetricEntity> metricEntities = new ArrayList<>();
         for (int i = 0; i < list.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index e663b29..666b3db 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -30,12 +30,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig;
+
 public class TaskAttemptCounterListener implements 
HistoryJobEntityCreationListener {
     private static final Logger logger = 
LoggerFactory.getLogger(TaskAttemptCounterListener.class);
     private static final int BATCH_SIZE = 1000;
     private Map<CounterKey, CounterValue> counters = new HashMap<>();
+    private EagleServiceConfig eagleServiceConfig;
 
-    public TaskAttemptCounterListener() {
+    public TaskAttemptCounterListener(EagleServiceConfig eagleServiceConfig) {
+        this.eagleServiceConfig = eagleServiceConfig;
     }
 
     private static class CounterKey {
@@ -111,12 +115,12 @@ public class TaskAttemptCounterListener implements 
HistoryJobEntityCreationListe
     @Override
     public void flush() throws Exception {
         IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
-        
client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds
 * 1000);
+        
client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 
1000);
         List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
         logger.info("start flushing TaskAttemptCounter entities of total 
number " + counters.size());
         // create entity

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index ef62a6c..46056c4 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -36,6 +36,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig;
+
 public class TaskFailureListener implements HistoryJobEntityCreationListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(TaskFailureListener.class);
     private static final String MR_ERROR_CATEGORY_CONFIG_FILE_NAME = 
"MRErrorCategory.config";
@@ -44,8 +46,10 @@ public class TaskFailureListener implements 
HistoryJobEntityCreationListener {
 
     private final List<TaskFailureCountAPIEntity> failureTasks = new 
ArrayList<TaskFailureCountAPIEntity>();
     private final MRErrorClassifier classifier;
+    private EagleServiceConfig eagleServiceConfig;
 
-    public TaskFailureListener() {
+    public TaskFailureListener(EagleServiceConfig eagleServiceConfig) {
+        this.eagleServiceConfig = eagleServiceConfig;
         InputStream is = null;
         try {
             is = 
TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
@@ -109,12 +113,12 @@ public class TaskFailureListener implements 
HistoryJobEntityCreationListener {
     @Override
     public void flush() throws Exception {
         IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
-        
client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds
 * 1000);
+        
client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 
1000);
 
         int tried = 0;
         while (tried <= MAX_RETRY_TIMES) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 8dc951f..436dbeb 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,6 +18,10 @@
 
 package org.apache.eagle.jpm.mr.history.storm;
 
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
@@ -26,14 +30,8 @@ import 
org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
 import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.util.JobIdFilterByPartition;
-import org.apache.eagle.jpm.util.JobIdPartitioner;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +40,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
+
 
 /**
  * Zookeeper znode structure
@@ -95,20 +95,22 @@ public class JobHistorySpout extends BaseRichSpout {
     private JHFInputStreamCallback callback;
     private JobHistoryLCM jhfLCM;
     private static final int MAX_RETRY_TIMES = 3;
-    private Config config;
+    private MRHistoryJobConfig appConfig;
+    private JobHistoryEndpointConfig jobHistoryEndpointConfig;
 
-    public JobHistorySpout(JobHistoryContentFilter filter, Config config) {
-        this(filter, config, new JobHistorySpoutCollectorInterceptor());
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig 
appConfig) {
+        this(filter, appConfig, new JobHistorySpoutCollectorInterceptor());
     }
 
     /**
      * mostly this constructor signature is for unit test purpose as you can 
put customized interceptor here.
      */
-    public JobHistorySpout(JobHistoryContentFilter filter, Config config, 
JobHistorySpoutCollectorInterceptor adaptor) {
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig 
appConfig, JobHistorySpoutCollectorInterceptor adaptor) {
         this.contentFilter = filter;
-        this.config = config;
         this.interceptor = adaptor;
-        callback = new DefaultJHFInputStreamCallback(contentFilter, 
interceptor);
+        this.appConfig = appConfig;
+        jobHistoryEndpointConfig = appConfig.getJobHistoryEndpointConfig();
+        callback = new DefaultJHFInputStreamCallback(contentFilter, 
interceptor,  appConfig);
     }
 
     private int calculatePartitionId(TopologyContext context) {
@@ -129,7 +131,6 @@ public class JobHistorySpout extends BaseRichSpout {
     @Override
     public void open(Map conf, TopologyContext context,
                      final SpoutOutputCollector collector) {
-        MRHistoryJobConfig.getInstance(config);
         partitionId = calculatePartitionId(context);
         // sanity verify 0<=partitionId<=numTotalPartitions-1
         if (partitionId < 0 || partitionId > numTotalPartitions) {
@@ -137,17 +138,18 @@ public class JobHistorySpout extends BaseRichSpout {
                 + partitionId + " and numTotalPartitions " + 
numTotalPartitions);
         }
         JobIdFilter jobIdFilter = new JobIdFilterByPartition(new 
DefaultJobIdPartitioner(), numTotalPartitions, partitionId);
-        
JobHistoryZKStateManager.instance().init(MRHistoryJobConfig.get().getZkStateConfig());
+        JobHistoryZKStateManager.instance().init(appConfig.getZkStateConfig());
         
JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
         interceptor.setSpoutOutputCollector(collector);
 
         try {
-            jhfLCM = new 
JobHistoryDAOImpl(MRHistoryJobConfig.get().getJobHistoryEndpointConfig());
+            jhfLCM = new JobHistoryDAOImpl(jobHistoryEndpointConfig);
             driver = new JHFCrawlerDriverImpl(
                 callback,
                 jhfLCM,
                 jobIdFilter,
-                partitionId);
+                partitionId,
+                appConfig);
         } catch (Exception e) {
             LOG.error("failing creating crawler driver");
             throw new IllegalStateException(e);
@@ -227,7 +229,7 @@ public class JobHistorySpout extends BaseRichSpout {
         LOG.info("update process time stamp {}", minTimeStamp);
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", 
MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site);
+                put("site", jobHistoryEndpointConfig.site);
             }
         };
         JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
@@ -235,13 +237,14 @@ public class JobHistorySpout extends BaseRichSpout {
         entity.setTimestamp(minTimeStamp);
         entity.setTags(baseTags);
 
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = 
appConfig.getEagleServiceConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
-            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
-            MRHistoryJobConfig.get().getEagleServiceConfig().username,
-            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
 
-        
client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds
 * 1000);
+        
client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 
1000);
 
         List<JobProcessTimeStampEntity> entities = new ArrayList<>();
         entities.add(entity);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index 16e8ea7..bfdde13 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -33,7 +33,7 @@ public class MRRunningJobApplication extends StormApplication 
{
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         //1. trigger init conf
-        MRRunningJobConfig mrRunningJobConfig = 
MRRunningJobConfig.getInstance(config);
+        MRRunningJobConfig mrRunningJobConfig = 
MRRunningJobConfig.newInstance(config);
 
         String[] confKeyPatternsSplit = 
mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
         List<String> confKeyKeys = new 
ArrayList<>(confKeyPatternsSplit.length);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 10d8f76..975e821 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -78,27 +78,25 @@ public class MRRunningJobConfig implements Serializable {
 
     private Config config;
 
-    private static MRRunningJobConfig manager = new MRRunningJobConfig();
-
-    private MRRunningJobConfig() {
+    private MRRunningJobConfig(Config config) {
         this.eagleServiceConfig = new EagleServiceConfig();
         this.endpointConfig = new EndpointConfig();
         this.zkStateConfig = new ZKStateConfig();
+        init(config);
     }
 
-    public static MRRunningJobConfig getInstance(String[] args) {
+    public static MRRunningJobConfig newInstance(String[] args) {
         try {
             LOG.info("Loading from configuration file");
-            return getInstance(new ConfigOptionParser().load(args));
+            return newInstance(new ConfigOptionParser().load(args));
         } catch (Exception e) {
             LOG.error("failed to load config");
             throw new IllegalArgumentException("Failed to load config", e);
         }
     }
 
-    public static MRRunningJobConfig getInstance(Config config) {
-        manager.init(config);
-        return manager;
+    public static MRRunningJobConfig newInstance(Config config) {
+        return new MRRunningJobConfig(config);
     }
 
     private void init(Config config) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
index 8a3097d..e30a0d8 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
@@ -29,7 +29,7 @@ public class SparkHistoryJobApp extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         // 1. Init conf
-        SparkHistoryJobAppConfig sparkHistoryJobAppConfig = 
SparkHistoryJobAppConfig.getInstance(config);
+        SparkHistoryJobAppConfig sparkHistoryJobAppConfig = 
SparkHistoryJobAppConfig.newInstance(config);
 
         final String jobFetchSpoutName = 
SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_FETCH_SPOUT_NAME;
         final String jobParseBoltName = 
SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_PARSE_BOLT_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 5049b40..86f13ff 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -40,23 +40,21 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
 
     private Config config;
 
-    private static SparkHistoryJobAppConfig manager = new 
SparkHistoryJobAppConfig();
-    
     public Config getConfig() {
         return config;
     }
 
-    public SparkHistoryJobAppConfig() {
+    private SparkHistoryJobAppConfig(Config config) {
         this.zkStateConfig = new ZKStateConfig();
         this.jobHistoryConfig = new JobHistoryEndpointConfig();
         this.jobHistoryConfig.hdfs = new HashMap<>();
         this.eagleInfo = new EagleInfo();
         this.stormConfig = new StormConfig();
+        init(config);
     }
 
-    public static SparkHistoryJobAppConfig getInstance(Config config) {
-        manager.init(config);
-        return manager;
+    public static SparkHistoryJobAppConfig newInstance(Config config) {
+        return new SparkHistoryJobAppConfig(config);
     }
 
     private void init(Config config) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
 
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index 7760e3d..95bcb4d 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ 
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -29,7 +29,7 @@ import 
org.apache.eagle.topology.storm.TopologyDataPersistBolt;
 public class TopologyCheckApp extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
-        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.getInstance(config);
+        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.newInstance(config);
 
         String spoutName = 
TopologyCheckAppConfig.TOPOLOGY_DATA_FETCH_SPOUT_NAME;
         String persistBoltName = 
TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
 
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index f6510da..409a87f 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ 
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -44,21 +44,23 @@ public class TopologyCheckAppConfig implements Serializable 
{
     public MRConfig mrConfig;
     public List<TopologyConstants.TopologyType> topologyTypes;
 
-    public Config config;
+    private Config config;
 
-    private static TopologyCheckAppConfig configManager = new 
TopologyCheckAppConfig();
-
-    private TopologyCheckAppConfig() {
+    private TopologyCheckAppConfig(Config config) {
         hBaseConfig = null;
         hdfsConfig = null;
         mrConfig = null;
         dataExtractorConfig = new DataExtractorConfig();
         topologyTypes = new ArrayList<>();
+        init(config);
+    }
+
+    public Config getConfig() {
+        return config;
     }
 
-    public static TopologyCheckAppConfig getInstance(Config config) {
-        configManager.init(config);
-        return configManager;
+    public static TopologyCheckAppConfig newInstance(Config config) {
+        return new TopologyCheckAppConfig(config);
     }
 
     private void init(Config config) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
 
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index 490f427..9b0eb82 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ 
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -53,7 +53,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        this.client = new EagleServiceClientImpl(new 
EagleServiceConnector(this.config.config));
+        this.client = new EagleServiceClientImpl(new 
EagleServiceConnector(this.config.getConfig()));
         this.collector = collector;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
 
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
index 6956ef1..11b6c15 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
+++ 
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
@@ -32,7 +32,7 @@ public class TestHbaseTopologyCrawler {
     public void test() {
         Config config = ConfigFactory.load();
 
-        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.getInstance(config);
+        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.newInstance(config);
         TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
         HbaseTopologyCrawler crawler = new 
HbaseTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
         crawler.extract();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
 
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
index 5069a3b..526756f 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
+++ 
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
@@ -32,7 +32,7 @@ public class TestHdfsTopologyCrawler {
     public void test() {
         Config config = ConfigFactory.load();
 
-        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.getInstance(config);
+        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.newInstance(config);
         TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
         HdfsTopologyCrawler crawler = new 
HdfsTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
         crawler.extract();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java
 
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java
index 96d6bb2..8b3c2e4 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java
+++ 
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java
@@ -32,7 +32,7 @@ public class TestMRTopologyCrawler {
     public void test() {
         Config config = ConfigFactory.load();
 
-        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.getInstance(config);
+        TopologyCheckAppConfig topologyCheckAppConfig = 
TopologyCheckAppConfig.newInstance(config);
         TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
         MRTopologyCrawler crawler = new 
MRTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
         crawler.extract();

Reply via email to