Repository: incubator-eagle Updated Branches: refs/heads/master 0cd117c1f -> c57f86dd1
[EAGLE-719] fix configuration bug in applications Author: anyway1021 <m...@apache.org> Closes #598 from anyway1021/EAGLE-719. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c57f86dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c57f86dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c57f86dd Branch: refs/heads/master Commit: c57f86dd186a87ec749420e50fdf3180959cb16f Parents: 0cd117c Author: anyway1021 <m...@apache.org> Authored: Wed Nov 2 17:51:39 2016 +0800 Committer: anyway1021 <m...@apache.org> Committed: Wed Nov 2 17:51:39 2016 +0800 ---------------------------------------------------------------------- .../hadoop/queue/HadoopQueueRunningApp.java | 2 +- .../queue/HadoopQueueRunningAppConfig.java | 17 +------ .../jpm/aggregation/AggregationApplication.java | 6 +-- .../jpm/aggregation/AggregationConfig.java | 19 +++----- .../jpm/aggregation/mr/MRMetricAggregator.java | 26 ++++++----- .../mr/MRMetricsAggregateContainer.java | 19 +++++--- .../jpm/aggregation/storm/AggregationBolt.java | 15 +++---- .../jpm/aggregation/storm/AggregationSpout.java | 20 ++++----- .../jpm/mr/history/MRHistoryJobApplication.java | 4 +- .../jpm/mr/history/MRHistoryJobConfig.java | 18 ++------ .../crawler/DefaultJHFInputStreamCallback.java | 11 +++-- .../history/crawler/JHFCrawlerDriverImpl.java | 6 +-- .../metrics/JobCountMetricsGenerator.java | 18 +++++--- .../metrics/JobCounterMetricsGenerator.java | 15 ++++--- .../mr/history/parser/JHFEventReaderBase.java | 10 +++-- .../mr/history/parser/JHFMRVer2EventReader.java | 5 ++- .../jpm/mr/history/parser/JHFParserFactory.java | 16 ++++--- ...JobConfigurationCreationServiceListener.java | 16 ++++--- .../JobEntityCreationEagleServiceListener.java | 21 +++++---- .../parser/TaskAttemptCounterListener.java | 16 ++++--- .../mr/history/parser/TaskFailureListener.java | 16 ++++--- .../jpm/mr/history/storm/JobHistorySpout.java | 47 +++++++++++--------- .../jpm/mr/running/MRRunningJobApplication.java | 2 +- .../jpm/mr/running/MRRunningJobConfig.java | 14 +++--- .../jpm/spark/history/SparkHistoryJobApp.java | 2 +- .../spark/history/SparkHistoryJobAppConfig.java | 10 ++--- .../apache/eagle/topology/TopologyCheckApp.java | 2 +- .../eagle/topology/TopologyCheckAppConfig.java | 16 ++++--- .../topology/storm/TopologyDataPersistBolt.java | 2 +- .../topology/TestHbaseTopologyCrawler.java | 2 +- .../eagle/topology/TestHdfsTopologyCrawler.java | 2 +- .../eagle/topology/TestMRTopologyCrawler.java | 2 +- 32 files changed, 206 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java index 7a853a1..77dc0cb 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -27,7 +27,7 @@ import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; public class HadoopQueueRunningApp extends StormApplication { public StormTopology execute(Config config, StormEnvironment environment) { - HadoopQueueRunningAppConfig appConfig = HadoopQueueRunningAppConfig.getInstance(config); + HadoopQueueRunningAppConfig appConfig = new HadoopQueueRunningAppConfig(config); IRichSpout spout = new HadoopQueueRunningSpout(appConfig); HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java index e370ce9..d398028 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java @@ -21,7 +21,6 @@ import com.typesafe.config.Config; import java.io.Serializable; public class HadoopQueueRunningAppConfig implements Serializable { - public static final HadoopQueueRunningAppConfig instance = new HadoopQueueRunningAppConfig(); public Topology topology; public DataSourceConfig dataSourceConfig; @@ -29,11 +28,11 @@ public class HadoopQueueRunningAppConfig implements Serializable { private Config config = null; - private HadoopQueueRunningAppConfig() { + public HadoopQueueRunningAppConfig(Config config) { this.topology = new Topology(); this.dataSourceConfig = new DataSourceConfig(); this.eagleProps = new EagleProps(); - this.config = null; + init(config); } public static class Topology implements Serializable { @@ -61,18 +60,6 @@ public class HadoopQueueRunningAppConfig implements Serializable { } } - public static HadoopQueueRunningAppConfig getInstance(Config config) { - if (config != null && instance.config == null) { - synchronized (instance) { - if (instance.config == null) { - instance.init(config); - } - return instance; - } - } - return instance; - } - public Config getConfig() { return config; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java index 750c9ab..ecddbce 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java @@ -54,17 +54,17 @@ public class AggregationApplication extends StormApplication { TopologyBuilder topologyBuilder = new TopologyBuilder(); String spoutName = "mrHistoryAggregationSpout"; String boltName = "mrHistoryAggregationBolt"; - AggregationConfig aggregationConfig = AggregationConfig.getInstance(config); + AggregationConfig aggregationConfig = AggregationConfig.newInstance(config); int tasks = aggregationConfig.getConfig().getInt("stormConfig." + spoutName + "Tasks"); topologyBuilder.setSpout( spoutName, - new AggregationSpout(config, new MRMetricsAggregateContainer(metrics)), + new AggregationSpout(aggregationConfig, new MRMetricsAggregateContainer(metrics, aggregationConfig)), tasks ).setNumTasks(tasks); tasks = aggregationConfig.getConfig().getInt("stormConfig." + boltName + "Tasks"); topologyBuilder.setBolt(boltName, - new AggregationBolt(config, new MRMetricsAggregateContainer(metrics)), + new AggregationBolt(aggregationConfig.getStormConfig(), new MRMetricsAggregateContainer(metrics, aggregationConfig)), tasks).setNumTasks(tasks).shuffleGrouping(spoutName); return topologyBuilder.createTopology(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java index e17c5b6..b4f56bf 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java @@ -72,25 +72,15 @@ public class AggregationConfig implements Serializable { public String password; } - private static AggregationConfig manager = new AggregationConfig(); - - private AggregationConfig() { + private AggregationConfig(Config config) { this.zkStateConfig = new ZKStateConfig(); this.stormConfig = new StormConfig(); this.eagleServiceConfig = new EagleServiceConfig(); - this.config = null; + init(config); } - public static AggregationConfig getInstance(Config config) { - if (config != null && manager.config == null) { - manager.init(config); - } - - return manager; - } - - public static AggregationConfig get() { - return getInstance(null); + public static AggregationConfig newInstance(Config config) { + return new AggregationConfig(config); } /** @@ -98,6 +88,7 @@ public class AggregationConfig implements Serializable { */ private void init(Config config) { this.config = config; + //parse stormConfig this.stormConfig.site = config.getString("siteId"); this.stormConfig.aggregationDuration = config.getLong("stormConfig.aggregationDuration"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java index 4941552..f8840b2 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java @@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.*; +import static org.apache.eagle.jpm.aggregation.AggregationConfig.EagleServiceConfig; + public class MRMetricAggregator implements MetricAggregator, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MRMetricAggregator.class); @@ -39,25 +41,29 @@ public class MRMetricAggregator implements MetricAggregator, Serializable { private List<List<String>> aggregateColumns; //key is AggregatorColumns, value is a map(key is timeStamp, value is metric value) private Map<AggregatorColumns, Map<Long, Long>> aggregateValues; + private AggregationConfig appConfig; + private EagleServiceConfig eagleServiceConfig; - public MRMetricAggregator(String metric, List<List<String>> aggregateColumns) { + public MRMetricAggregator(String metric, List<List<String>> aggregateColumns, AggregationConfig appConfig) { this.metric = metric; this.aggregateColumns = aggregateColumns; this.aggregateValues = new TreeMap<>(); + this.appConfig = appConfig; + eagleServiceConfig = appConfig.getEagleServiceConfig(); } @Override public boolean aggregate(long startTime, long endTime) { LOG.info("start to aggregate {} from {} to {}", metric, startTime, endTime); IEagleServiceClient client = new EagleServiceClientImpl( - AggregationConfig.get().getEagleServiceConfig().eagleServiceHost, - AggregationConfig.get().getEagleServiceConfig().eagleServicePort, - AggregationConfig.get().getEagleServiceConfig().username, - AggregationConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); String query = String.format("%s[@site=\"%s\"]{*}", Constants.GENERIC_METRIC_SERVICE, - AggregationConfig.get().getStormConfig().site); + appConfig.getStormConfig().site); GenericServiceAPIResponseEntity response; try { @@ -115,10 +121,10 @@ public class MRMetricAggregator implements MetricAggregator, Serializable { private boolean flush() { IEagleServiceClient client = new EagleServiceClientImpl( - AggregationConfig.get().getEagleServiceConfig().eagleServiceHost, - AggregationConfig.get().getEagleServiceConfig().eagleServicePort, - AggregationConfig.get().getEagleServiceConfig().username, - AggregationConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); List<GenericMetricEntity> entities = new ArrayList<>(); for (AggregatorColumns aggregatorColumns : this.aggregateValues.keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java index 087e44c..00d0457 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java @@ -34,31 +34,36 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.eagle.jpm.aggregation.AggregationConfig.EagleServiceConfig; + public class MRMetricsAggregateContainer implements MetricsAggregateContainer, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MRMetricsAggregateContainer.class); private Map<String, MetricAggregator> metricAggregators; + private AggregationConfig appConfig; - public MRMetricsAggregateContainer(Map<String, List<List<String>>> metrics) { + public MRMetricsAggregateContainer(Map<String, List<List<String>>> metrics, AggregationConfig appConfig) { this.metricAggregators = new HashMap<>(); //metric name, aggregate columns for (String metric : metrics.keySet()) { - this.metricAggregators.put(metric, new MRMetricAggregator(metric, metrics.get(metric))); + this.metricAggregators.put(metric, new MRMetricAggregator(metric, metrics.get(metric), appConfig)); } + this.appConfig = appConfig; } @Override public long fetchLatestJobProcessTime() { try { + EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - AggregationConfig.get().getEagleServiceConfig().eagleServiceHost, - AggregationConfig.get().getEagleServiceConfig().eagleServicePort, - AggregationConfig.get().getEagleServiceConfig().username, - AggregationConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); String query = String.format("%s[@site=\"%s\"]<@site>{max(currentTimeStamp)}", Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME, - AggregationConfig.get().getStormConfig().site); + appConfig.getStormConfig().site); GenericServiceAPIResponseEntity response = client .search(query) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java index f4521cd..bb2e3de 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationBolt.java @@ -18,33 +18,32 @@ package org.apache.eagle.jpm.aggregation.storm; -import org.apache.eagle.jpm.aggregation.AggregationConfig; -import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; -import com.typesafe.config.Config; +import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.eagle.jpm.aggregation.AggregationConfig.StormConfig; + public class AggregationBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(AggregationBolt.class); - private Config config; + private StormConfig stormConfig; private OutputCollector collector; private MetricsAggregateContainer metricsAggregateContainer; - public AggregationBolt(Config config, MetricsAggregateContainer metricsAggregateContainer) { - this.config = config; + public AggregationBolt(StormConfig stormConfig, MetricsAggregateContainer metricsAggregateContainer) { + this.stormConfig = stormConfig; this.metricsAggregateContainer = metricsAggregateContainer; } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { - AggregationConfig.getInstance(config); this.collector = outputCollector; } @@ -52,7 +51,7 @@ public class AggregationBolt extends BaseRichBolt { public void execute(Tuple tuple) { Long startTime = tuple.getLongByField("startTime"); LOG.info("get startTime {}", startTime); - Long endTime = startTime + AggregationConfig.get().getStormConfig().aggregationDuration * 1000; + Long endTime = startTime + stormConfig.aggregationDuration * 1000; if (metricsAggregateContainer.aggregate(startTime, endTime)) { collector.ack(tuple); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java index 8a2f06a..3ee0519 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java @@ -18,17 +18,16 @@ package org.apache.eagle.jpm.aggregation.storm; -import org.apache.eagle.jpm.aggregation.AggregationConfig; -import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer; -import org.apache.eagle.jpm.aggregation.state.AggregationTimeManager; -import org.apache.eagle.jpm.util.Utils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import com.typesafe.config.Config; +import org.apache.eagle.jpm.aggregation.AggregationConfig; +import org.apache.eagle.jpm.aggregation.common.MetricsAggregateContainer; +import org.apache.eagle.jpm.aggregation.state.AggregationTimeManager; +import org.apache.eagle.jpm.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,23 +40,22 @@ public class AggregationSpout extends BaseRichSpout { private static final Long MAX_WAIT_TIME = 12 * 60 * 60000L;//12 hours private static final Long MAX_SAFE_TIME = 6 * 60 * 60000L;//6 hours - private Config config; + private AggregationConfig appConfig; MetricsAggregateContainer jobProcessTime; private SpoutOutputCollector collector; private Set<Long> processStartTime; private Long lastUpdateTime; - public AggregationSpout(Config config, MetricsAggregateContainer jobProcessTime) { - this.config = config; + public AggregationSpout(AggregationConfig appConfig, MetricsAggregateContainer jobProcessTime) { + this.appConfig = appConfig; this.jobProcessTime = jobProcessTime; this.processStartTime = new HashSet<>(); } @Override public void open(Map conf, TopologyContext context, final SpoutOutputCollector collector) { - AggregationConfig.getInstance(config); - AggregationTimeManager.instance().init(AggregationConfig.get().getZkStateConfig()); + AggregationTimeManager.instance().init(appConfig.getZkStateConfig()); this.collector = collector; } @@ -90,7 +88,7 @@ public class AggregationSpout extends BaseRichSpout { for (Long startTime = lastUpdateTime; startTime < lastUpdateTime + MAX_SAFE_TIME;) { collector.emit(new Values(startTime), startTime); this.processStartTime.add(startTime); - startTime += AggregationConfig.get().getStormConfig().aggregationDuration * 1000; + startTime += appConfig.getStormConfig().aggregationDuration * 1000; } } catch (Exception e) { LOG.warn("{}", e); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java index aaf65ac..e4e206f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java @@ -37,7 +37,7 @@ public class MRHistoryJobApplication extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { //1. trigger init conf - MRHistoryJobConfig appConfig = MRHistoryJobConfig.getInstance(config); + MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(config); com.typesafe.config.Config jhfAppConf = appConfig.getConfig(); //2. init JobHistoryContentFilter @@ -65,7 +65,7 @@ public class MRHistoryJobApplication extends StormApplication { int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks"); topologyBuilder.setSpout( spoutName, - new JobHistorySpout(filter, config), + new JobHistorySpout(filter, appConfig), tasks ).setNumTasks(tasks); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java index 496aa77..b0c934a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java @@ -85,31 +85,21 @@ public class MRHistoryJobConfig implements Serializable { public int readTimeoutSeconds; } - private static MRHistoryJobConfig manager = new MRHistoryJobConfig(); - /** * As this is singleton object and constructed while this class is being initialized, * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError. * And this is unrecoverable and hard to troubleshooting. */ - private MRHistoryJobConfig() { + private MRHistoryJobConfig(Config config) { this.zkStateConfig = new ZKStateConfig(); this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig(); this.jobHistoryEndpointConfig.hdfs = new HashMap<>(); this.eagleServiceConfig = new EagleServiceConfig(); - this.config = null; - } - - public static MRHistoryJobConfig getInstance(Config config) { - if (config != null && manager.config == null) { - manager.init(config); - } - - return manager; + init(config); } - public static MRHistoryJobConfig get() { - return getInstance(null); + public static MRHistoryJobConfig newInstance(Config config) { + return new MRHistoryJobConfig(config); } /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index af83985..2b5e15c 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -28,16 +28,19 @@ import java.io.InputStream; import java.util.HashMap; import java.util.Map; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; + public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { private static final Logger LOG = LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class); - private JobHistoryContentFilter filter; private EagleOutputCollector collector; + private MRHistoryJobConfig appConfig; - public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector) { + public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector, MRHistoryJobConfig appConfig) { this.filter = filter; this.collector = eagleCollector; + this.appConfig = appConfig; } @Override @@ -45,7 +48,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site); + put("site", appConfig.getJobHistoryEndpointConfig().site); } }; @@ -54,7 +57,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { jobFileInputStream.close(); } else { //get parser and parse, do not need to emit data now - JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter, this.collector); + JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter, this.collector, appConfig); parser.parse(jobFileInputStream); jobFileInputStream.close(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index ef38b43..9c9374d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -61,13 +61,13 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private JobCountMetricsGenerator jobCountMetricsGenerator; public JHFCrawlerDriverImpl(JHFInputStreamCallback reader, - JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { + JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId, MRHistoryJobConfig appConfig) throws Exception { this.reader = reader; jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig); this.partitionId = partitionId; this.jobFilter = jobFilter; - timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone); - jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone); + timeZone = TimeZone.getTimeZone(appConfig.getJobHistoryEndpointConfig().timeZone); + jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone, appConfig); } /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java index f74f77a..573d3cb 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java @@ -32,13 +32,18 @@ import org.slf4j.LoggerFactory; import java.util.*; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig; + public class JobCountMetricsGenerator { private static final Logger LOG = LoggerFactory.getLogger(JobCountMetricsGenerator.class); private TimeZone timeZone; - public JobCountMetricsGenerator(TimeZone timeZone) { + private MRHistoryJobConfig appConfig; + + public JobCountMetricsGenerator(TimeZone timeZone, MRHistoryJobConfig appConfig) { this.timeZone = timeZone; + this.appConfig = appConfig; } public void flush(String date, int year, int month, int day) throws Exception { @@ -58,11 +63,12 @@ public class JobCountMetricsGenerator { } int failed = total - killed - succeeded; + EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); final IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); GregorianCalendar cal = new GregorianCalendar(year, month, day); @@ -88,7 +94,7 @@ public class JobCountMetricsGenerator { @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site); + put("site", appConfig.getJobHistoryEndpointConfig().site); put(MRJobTagName.JOB_STATUS.toString(), state); } }; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java index 6291b37..6020a3b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java @@ -34,6 +34,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig; + public class JobCounterMetricsGenerator { private static final Logger LOG = LoggerFactory.getLogger(JobCounterMetricsGenerator.class); private static final int BATCH_SIZE = 1000; @@ -45,8 +47,11 @@ public class JobCounterMetricsGenerator { private List<GenericMetricEntity> lastEntitiesBatch; private Map<String, String> baseTags; - public JobCounterMetricsGenerator() { + private EagleServiceConfig eagleServiceConfig; + + public JobCounterMetricsGenerator(EagleServiceConfig eagleServiceConfig) { this.lastEntitiesBatch = null; + this.eagleServiceConfig = eagleServiceConfig; } public void setBaseTags(Map<String, String> tags) { @@ -115,10 +120,10 @@ public class JobCounterMetricsGenerator { } IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); for (List<GenericMetricEntity> entities : metricEntities) { LOG.info("start flushing entities of total number " + entities.size()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 8b35a56..e48370d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -79,6 +79,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl private JobCounterMetricsGenerator jobCounterMetricsGenerator; + private MRHistoryJobConfig appConfig; + /** * baseTags stores the basic tag name values which might be used for persisting various entities. * baseTags includes: site and jobName @@ -86,7 +88,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl * * @param baseTags */ - public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { + public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) { this.filter = filter; this.baseTags = baseTags; @@ -119,7 +121,9 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } this.sumMapTaskDuration = 0L; this.sumReduceTaskDuration = 0L; - this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(); + + this.appConfig = appConfig; + this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(appConfig.getEagleServiceConfig()); } public void register(HistoryJobEntityLifecycleListener lifecycleListener) { @@ -149,7 +153,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } private String buildJobTrackingUrl(String jobId) { - String jobTrackingUrlBase = MRHistoryJobConfig.getInstance(null).getJobHistoryEndpointConfig().mrHistoryServerUrl + "/jobhistory/job/"; + String jobTrackingUrlBase = appConfig.getJobHistoryEndpointConfig().mrHistoryServerUrl + "/jobhistory/job/"; try { URI oldUri = new URI(jobTrackingUrlBase); URI resolved = oldUri.resolve(jobId); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java index 5324c02..6e0e3aa 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.history.parser; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.hadoop.classification.InterfaceAudience; @@ -42,8 +43,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { * * @throws IOException */ - public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { - super(baseTags, configuration, filter); + public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) { + super(baseTags, configuration, filter, appConfig); } @SuppressWarnings("deprecation") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java index fd5483b..784d107 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.history.parser; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.hadoop.conf.Configuration; @@ -33,12 +34,15 @@ public class JHFParserFactory { public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, - EagleOutputCollector outputCollector) { - JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter); - reader2.addListener(new JobEntityCreationEagleServiceListener(outputCollector)); - reader2.addListener(new TaskFailureListener()); - reader2.addListener(new TaskAttemptCounterListener()); - reader2.addListener(new JobConfigurationCreationServiceListener()); + EagleOutputCollector outputCollector, + MRHistoryJobConfig appConfig) { + MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); + + JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, appConfig); + reader2.addListener(new JobEntityCreationEagleServiceListener(outputCollector, appConfig)); + reader2.addListener(new TaskFailureListener(eagleServiceConfig)); + reader2.addListener(new TaskAttemptCounterListener(eagleServiceConfig)); + reader2.addListener(new JobConfigurationCreationServiceListener(eagleServiceConfig)); reader2.register(new JobEntityLifecycleAggregator()); JHFParserBase parser = new JHFMRVer2Parser(reader2); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java index 28020b0..96f2b3b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java @@ -29,12 +29,16 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig; + public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener { private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class); private static final int MAX_RETRY_TIMES = 3; private JobConfigurationAPIEntity jobConfigurationEntity; + private EagleServiceConfig eagleServiceConfig; - public JobConfigurationCreationServiceListener() { + public JobConfigurationCreationServiceListener(EagleServiceConfig eagleServiceConfig) { + this.eagleServiceConfig = eagleServiceConfig; } @Override @@ -54,12 +58,12 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity @Override public void flush() throws Exception { IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); List<JobConfigurationAPIEntity> list = new ArrayList<>(); list.add(jobConfigurationEntity); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 2c7a2b1..902590b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -49,18 +49,20 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener(); private TimeZone timeZone; private EagleOutputCollector collector; + private MRHistoryJobConfig appConfig; - public JobEntityCreationEagleServiceListener(EagleOutputCollector collector) { - this(BATCH_SIZE, collector); + public JobEntityCreationEagleServiceListener(EagleOutputCollector collector, MRHistoryJobConfig appConfig) { + this(BATCH_SIZE, collector, appConfig); } - public JobEntityCreationEagleServiceListener(int batchSize, EagleOutputCollector collector) { + public JobEntityCreationEagleServiceListener(int batchSize, EagleOutputCollector collector, MRHistoryJobConfig appConfig) { if (batchSize <= 0) { throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided"); } this.batchSize = batchSize; this.collector = collector; - timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone); + this.appConfig = appConfig; + timeZone = TimeZone.getTimeZone(appConfig.getJobHistoryEndpointConfig().timeZone); } @Override @@ -86,13 +88,14 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr */ @Override public void flush() throws Exception { + MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); logger.info("start flushing entities of total number " + list.size()); List<GenericMetricEntity> metricEntities = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java index e663b29..666b3db 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -30,12 +30,16 @@ import org.slf4j.LoggerFactory; import java.util.*; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig; + public class TaskAttemptCounterListener implements HistoryJobEntityCreationListener { private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class); private static final int BATCH_SIZE = 1000; private Map<CounterKey, CounterValue> counters = new HashMap<>(); + private EagleServiceConfig eagleServiceConfig; - public TaskAttemptCounterListener() { + public TaskAttemptCounterListener(EagleServiceConfig eagleServiceConfig) { + this.eagleServiceConfig = eagleServiceConfig; } private static class CounterKey { @@ -111,12 +115,12 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe @Override public void flush() throws Exception { IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); List<TaskAttemptCounterAPIEntity> list = new ArrayList<>(); logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size()); // create entity http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index ef62a6c..46056c4 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -36,6 +36,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.EagleServiceConfig; + public class TaskFailureListener implements HistoryJobEntityCreationListener { private static final Logger LOG = LoggerFactory.getLogger(TaskFailureListener.class); private static final String MR_ERROR_CATEGORY_CONFIG_FILE_NAME = "MRErrorCategory.config"; @@ -44,8 +46,10 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>(); private final MRErrorClassifier classifier; + private EagleServiceConfig eagleServiceConfig; - public TaskFailureListener() { + public TaskFailureListener(EagleServiceConfig eagleServiceConfig) { + this.eagleServiceConfig = eagleServiceConfig; InputStream is = null; try { is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME); @@ -109,12 +113,12 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { @Override public void flush() throws Exception { IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); int tried = 0; while (tried <= MAX_RETRY_TIMES) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 8dc951f..436dbeb 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -18,6 +18,10 @@ package org.apache.eagle.jpm.mr.history.storm; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.*; @@ -26,14 +30,8 @@ import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity; import org.apache.eagle.jpm.util.DefaultJobIdPartitioner; import org.apache.eagle.jpm.util.JobIdFilter; import org.apache.eagle.jpm.util.JobIdFilterByPartition; -import org.apache.eagle.jpm.util.JobIdPartitioner; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +40,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; + /** * Zookeeper znode structure @@ -95,20 +95,22 @@ public class JobHistorySpout extends BaseRichSpout { private JHFInputStreamCallback callback; private JobHistoryLCM jhfLCM; private static final int MAX_RETRY_TIMES = 3; - private Config config; + private MRHistoryJobConfig appConfig; + private JobHistoryEndpointConfig jobHistoryEndpointConfig; - public JobHistorySpout(JobHistoryContentFilter filter, Config config) { - this(filter, config, new JobHistorySpoutCollectorInterceptor()); + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) { + this(filter, appConfig, new JobHistorySpoutCollectorInterceptor()); } /** * mostly this constructor signature is for unit test purpose as you can put customized interceptor here. */ - public JobHistorySpout(JobHistoryContentFilter filter, Config config, JobHistorySpoutCollectorInterceptor adaptor) { + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig appConfig, JobHistorySpoutCollectorInterceptor adaptor) { this.contentFilter = filter; - this.config = config; this.interceptor = adaptor; - callback = new DefaultJHFInputStreamCallback(contentFilter, interceptor); + this.appConfig = appConfig; + jobHistoryEndpointConfig = appConfig.getJobHistoryEndpointConfig(); + callback = new DefaultJHFInputStreamCallback(contentFilter, interceptor, appConfig); } private int calculatePartitionId(TopologyContext context) { @@ -129,7 +131,6 @@ public class JobHistorySpout extends BaseRichSpout { @Override public void open(Map conf, TopologyContext context, final SpoutOutputCollector collector) { - MRHistoryJobConfig.getInstance(config); partitionId = calculatePartitionId(context); // sanity verify 0<=partitionId<=numTotalPartitions-1 if (partitionId < 0 || partitionId > numTotalPartitions) { @@ -137,17 +138,18 @@ public class JobHistorySpout extends BaseRichSpout { + partitionId + " and numTotalPartitions " + numTotalPartitions); } JobIdFilter jobIdFilter = new JobIdFilterByPartition(new DefaultJobIdPartitioner(), numTotalPartitions, partitionId); - JobHistoryZKStateManager.instance().init(MRHistoryJobConfig.get().getZkStateConfig()); + JobHistoryZKStateManager.instance().init(appConfig.getZkStateConfig()); JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions); interceptor.setSpoutOutputCollector(collector); try { - jhfLCM = new JobHistoryDAOImpl(MRHistoryJobConfig.get().getJobHistoryEndpointConfig()); + jhfLCM = new JobHistoryDAOImpl(jobHistoryEndpointConfig); driver = new JHFCrawlerDriverImpl( callback, jhfLCM, jobIdFilter, - partitionId); + partitionId, + appConfig); } catch (Exception e) { LOG.error("failing creating crawler driver"); throw new IllegalStateException(e); @@ -227,7 +229,7 @@ public class JobHistorySpout extends BaseRichSpout { LOG.info("update process time stamp {}", minTimeStamp); Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site); + put("site", jobHistoryEndpointConfig.site); } }; JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity(); @@ -235,13 +237,14 @@ public class JobHistorySpout extends BaseRichSpout { entity.setTimestamp(minTimeStamp); entity.setTags(baseTags); + MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); IEagleServiceClient client = new EagleServiceClientImpl( - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost, - MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort, - MRHistoryJobConfig.get().getEagleServiceConfig().username, - MRHistoryJobConfig.get().getEagleServiceConfig().password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000); + client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); List<JobProcessTimeStampEntity> entities = new ArrayList<>(); entities.add(entity); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java index 16e8ea7..bfdde13 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java @@ -33,7 +33,7 @@ public class MRRunningJobApplication extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { //1. trigger init conf - MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.getInstance(config); + MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config); String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(","); List<String> confKeyKeys = new ArrayList<>(confKeyPatternsSplit.length); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java index 10d8f76..975e821 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java @@ -78,27 +78,25 @@ public class MRRunningJobConfig implements Serializable { private Config config; - private static MRRunningJobConfig manager = new MRRunningJobConfig(); - - private MRRunningJobConfig() { + private MRRunningJobConfig(Config config) { this.eagleServiceConfig = new EagleServiceConfig(); this.endpointConfig = new EndpointConfig(); this.zkStateConfig = new ZKStateConfig(); + init(config); } - public static MRRunningJobConfig getInstance(String[] args) { + public static MRRunningJobConfig newInstance(String[] args) { try { LOG.info("Loading from configuration file"); - return getInstance(new ConfigOptionParser().load(args)); + return newInstance(new ConfigOptionParser().load(args)); } catch (Exception e) { LOG.error("failed to load config"); throw new IllegalArgumentException("Failed to load config", e); } } - public static MRRunningJobConfig getInstance(Config config) { - manager.init(config); - return manager; + public static MRRunningJobConfig newInstance(Config config) { + return new MRRunningJobConfig(config); } private void init(Config config) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java index 8a3097d..e30a0d8 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java @@ -29,7 +29,7 @@ public class SparkHistoryJobApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { // 1. Init conf - SparkHistoryJobAppConfig sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.getInstance(config); + SparkHistoryJobAppConfig sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config); final String jobFetchSpoutName = SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_FETCH_SPOUT_NAME; final String jobParseBoltName = SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_PARSE_BOLT_NAME; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java index 5049b40..86f13ff 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java @@ -40,23 +40,21 @@ public class SparkHistoryJobAppConfig implements Serializable { private Config config; - private static SparkHistoryJobAppConfig manager = new SparkHistoryJobAppConfig(); - public Config getConfig() { return config; } - public SparkHistoryJobAppConfig() { + private SparkHistoryJobAppConfig(Config config) { this.zkStateConfig = new ZKStateConfig(); this.jobHistoryConfig = new JobHistoryEndpointConfig(); this.jobHistoryConfig.hdfs = new HashMap<>(); this.eagleInfo = new EagleInfo(); this.stormConfig = new StormConfig(); + init(config); } - public static SparkHistoryJobAppConfig getInstance(Config config) { - manager.init(config); - return manager; + public static SparkHistoryJobAppConfig newInstance(Config config) { + return new SparkHistoryJobAppConfig(config); } private void init(Config config) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java index 7760e3d..95bcb4d 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java @@ -29,7 +29,7 @@ import org.apache.eagle.topology.storm.TopologyDataPersistBolt; public class TopologyCheckApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { - TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config); + TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); String spoutName = TopologyCheckAppConfig.TOPOLOGY_DATA_FETCH_SPOUT_NAME; String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java index f6510da..409a87f 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java @@ -44,21 +44,23 @@ public class TopologyCheckAppConfig implements Serializable { public MRConfig mrConfig; public List<TopologyConstants.TopologyType> topologyTypes; - public Config config; + private Config config; - private static TopologyCheckAppConfig configManager = new TopologyCheckAppConfig(); - - private TopologyCheckAppConfig() { + private TopologyCheckAppConfig(Config config) { hBaseConfig = null; hdfsConfig = null; mrConfig = null; dataExtractorConfig = new DataExtractorConfig(); topologyTypes = new ArrayList<>(); + init(config); + } + + public Config getConfig() { + return config; } - public static TopologyCheckAppConfig getInstance(Config config) { - configManager.init(config); - return configManager; + public static TopologyCheckAppConfig newInstance(Config config) { + return new TopologyCheckAppConfig(config); } private void init(Config config) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java index 490f427..9b0eb82 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java @@ -53,7 +53,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.config)); + this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig())); this.collector = collector; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java index 6956ef1..11b6c15 100644 --- a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java @@ -32,7 +32,7 @@ public class TestHbaseTopologyCrawler { public void test() { Config config = ConfigFactory.load(); - TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config); + TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); HbaseTopologyCrawler crawler = new HbaseTopologyCrawler(topologyCheckAppConfig, rackResolver, null); crawler.extract(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java index 5069a3b..526756f 100644 --- a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java @@ -32,7 +32,7 @@ public class TestHdfsTopologyCrawler { public void test() { Config config = ConfigFactory.load(); - TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config); + TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); HdfsTopologyCrawler crawler = new HdfsTopologyCrawler(topologyCheckAppConfig, rackResolver, null); crawler.extract(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c57f86dd/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java index 96d6bb2..8b3c2e4 100644 --- a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestMRTopologyCrawler.java @@ -32,7 +32,7 @@ public class TestMRTopologyCrawler { public void test() { Config config = ConfigFactory.load(); - TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config); + TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); MRTopologyCrawler crawler = new MRTopologyCrawler(topologyCheckAppConfig, rackResolver, null); crawler.extract();