[EAGLE-496] fix code style of jpm Author: wujinhu <wujinhu...@126.com>
Closes #383 from wujinhu/EAGLE-496. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0b852cbc Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0b852cbc Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0b852cbc Branch: refs/heads/master Commit: 0b852cbcd4243e71f51ae7a42c68e1ce7571545e Parents: 6a55b59 Author: wujinhu <wujinhu...@126.com> Authored: Wed Aug 24 20:22:58 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Wed Aug 24 20:22:58 2016 +0800 ---------------------------------------------------------------------- .../queue/common/HadoopYarnResourceUtils.java | 2 +- .../queue/common/YarnURLSelectorImpl.java | 2 +- .../storm/HadoopQueueRunningExtractor.java | 2 +- .../eagle/jpm/mr/historyentity/JobConfig.java | 2 +- .../jpm/mr/historyentity/JobConfigSerDeser.java | 15 +- .../JobConfigurationAPIEntity.java | 13 +- .../jpm/mr/historyentity/JobEventAPIEntity.java | 4 +- .../mr/historyentity/JobExecutionAPIEntity.java | 23 +- .../JobProcessTimeStampEntity.java | 3 +- .../TaskAttemptCounterAPIEntity.java | 7 +- .../TaskAttemptExecutionAPIEntity.java | 17 +- .../historyentity/TaskExecutionAPIEntity.java | 17 +- .../TaskFailureCountAPIEntity.java | 3 +- .../mr/runningentity/JobExecutionAPIEntity.java | 10 +- .../TaskAttemptExecutionAPIEntity.java | 6 +- .../runningentity/TaskExecutionAPIEntity.java | 6 +- .../jpm/spark/crawl/JHFInputStreamReader.java | 3 +- .../eagle/jpm/spark/crawl/JHFParserBase.java | 4 +- .../jpm/spark/crawl/JHFSparkEventReader.java | 82 +++-- .../eagle/jpm/spark/crawl/JHFSparkParser.java | 12 +- .../SparkFilesystemInputStreamReaderImpl.java | 4 +- .../eagle/jpm/spark/entity/JobConfig.java | 3 +- .../apache/eagle/jpm/spark/entity/SparkApp.java | 11 +- .../eagle/jpm/spark/entity/SparkExecutor.java | 4 +- .../apache/eagle/jpm/spark/entity/SparkJob.java | 24 +- .../eagle/jpm/spark/entity/SparkStage.java | 32 +- .../eagle/jpm/spark/entity/SparkTask.java | 4 +- .../history/crawler/AbstractJobHistoryDAO.java | 61 ++-- .../crawler/DefaultJHFInputStreamCallback.java | 24 +- .../history/crawler/EagleOutputCollector.java | 6 +- .../mr/history/crawler/JHFCrawlerDriver.java | 4 +- .../history/crawler/JHFCrawlerDriverImpl.java | 193 +++++------ .../history/crawler/JHFInputStreamCallback.java | 4 +- .../crawler/JobHistoryContentFilter.java | 5 + .../crawler/JobHistoryContentFilterBuilder.java | 53 +-- .../crawler/JobHistoryContentFilterImpl.java | 42 +-- .../mr/history/crawler/JobHistoryDAOImpl.java | 80 ++--- .../jpm/mr/history/crawler/JobHistoryLCM.java | 24 +- .../JobHistorySpoutCollectorInterceptor.java | 8 +- .../HistoryJobEntityCreationListener.java | 6 +- .../mr/history/parser/JHFEventReaderBase.java | 319 ++++++++++--------- .../mr/history/parser/JHFMRVer1EventReader.java | 10 +- .../jpm/mr/history/parser/JHFMRVer1Parser.java | 15 +- .../mr/history/parser/JHFMRVer2EventReader.java | 5 +- .../jpm/mr/history/parser/JHFMRVer2Parser.java | 10 +- .../jpm/mr/history/parser/JHFParserBase.java | 5 +- .../parser/JHFWriteNotCompletedException.java | 6 +- ...JobConfigurationCreationServiceListener.java | 8 +- .../JobEntityCreationEagleServiceListener.java | 2 +- .../parser/JobEntityCreationPublisher.java | 5 - .../parser/JobEntityLifecycleAggregator.java | 68 ++-- .../mr/history/parser/TaskFailureListener.java | 13 +- .../eagle/jpm/mr/running/MRRunningJobMain.java | 11 +- .../running/config/MRRunningConfigManager.java | 13 +- .../jpm/mr/running/parser/MRJobParser.java | 62 ++-- .../mr/running/recover/MRRunningJobManager.java | 8 +- .../running/storm/MRRunningJobFetchSpout.java | 6 +- .../mr/running/storm/MRRunningJobParseBolt.java | 6 +- .../history/config/SparkHistoryCrawlConfig.java | 3 +- .../status/JobHistoryZKStateManager.java | 153 +++++---- .../spark/history/status/ZKStateConstant.java | 2 +- .../history/storm/FinishedSparkJobSpout.java | 23 +- .../history/storm/SparkHistoryTopology.java | 14 +- .../spark/history/storm/SparkJobParseBolt.java | 16 +- .../eagle/jpm/spark/history/storm/TestHDFS.java | 4 +- .../jpm/spark/running/SparkRunningJobApp.java | 9 +- .../spark/running/SparkRunningJobAppConfig.java | 17 +- .../spark/running/entities/SparkAppEntity.java | 11 +- .../running/entities/SparkExecutorEntity.java | 2 +- .../spark/running/entities/SparkJobEntity.java | 2 +- .../running/entities/SparkStageEntity.java | 24 +- .../spark/running/entities/SparkTaskEntity.java | 2 +- .../running/parser/SparkApplicationParser.java | 44 +-- .../running/recover/SparkRunningJobManager.java | 7 +- .../storm/SparkRunningJobFetchSpout.java | 6 +- .../running/storm/SparkRunningJobParseBolt.java | 6 +- .../org/apache/eagle/jpm/util/Constants.java | 35 +- .../org/apache/eagle/jpm/util/HDFSUtil.java | 12 +- .../org/apache/eagle/jpm/util/JSONUtil.java | 30 +- .../eagle/jpm/util/JobNameNormalization.java | 178 ++++++----- .../org/apache/eagle/jpm/util/MRJobTagName.java | 5 +- .../eagle/jpm/util/SparkEntityConstant.java | 4 +- .../apache/eagle/jpm/util/SparkJobTagName.java | 5 +- .../java/org/apache/eagle/jpm/util/Utils.java | 16 +- .../util/jobcounter/CounterGroupDictionary.java | 13 +- .../jpm/util/jobcounter/CounterGroupKey.java | 7 +- .../eagle/jpm/util/jobcounter/CounterKey.java | 5 +- .../util/jobcounter/JobCounterException.java | 12 +- .../eagle/jpm/util/jobcounter/JobCounters.java | 2 +- .../util/jobcounter/JobCountersSerDeser.java | 2 +- .../jpm/util/jobrecover/RunningJobManager.java | 15 +- .../util/resourceFetch/RMResourceFetcher.java | 305 +++++++++--------- .../jpm/util/resourceFetch/ResourceFetcher.java | 4 +- .../SparkHistoryServerResourceFetcher.java | 29 +- .../connection/InputStreamUtils.java | 77 +++-- .../util/resourceFetch/connection/JobUtils.java | 42 +-- .../connection/URLConnectionUtils.java | 121 ++++--- .../resourceFetch/ha/AbstractURLSelector.java | 32 +- .../util/resourceFetch/ha/HAURLSelector.java | 14 +- .../resourceFetch/ha/HAURLSelectorImpl.java | 150 ++++----- .../jpm/util/resourceFetch/model/AppInfo.java | 251 ++++++++------- .../util/resourceFetch/model/Applications.java | 20 +- .../util/resourceFetch/model/AppsWrapper.java | 22 +- .../util/resourceFetch/model/ClusterInfo.java | 4 +- .../resourceFetch/model/ClusterInfoWrapper.java | 4 +- .../resourceFetch/model/JobCounterGroup.java | 8 +- .../resourceFetch/model/JobCounterItem.java | 11 +- .../util/resourceFetch/model/JobCounters.java | 8 +- .../resourceFetch/model/JobCountersWrapper.java | 4 +- .../jpm/util/resourceFetch/model/MRJob.java | 4 +- .../util/resourceFetch/model/MRJobsWrapper.java | 4 +- .../jpm/util/resourceFetch/model/MRTask.java | 4 +- .../util/resourceFetch/model/MRTaskAttempt.java | 4 +- .../model/MRTaskAttemptWrapper.java | 4 +- .../resourceFetch/model/MRTaskAttempts.java | 4 +- .../jpm/util/resourceFetch/model/MRTasks.java | 4 +- .../resourceFetch/model/MRTasksWrapper.java | 4 +- .../jpm/util/resourceFetch/model/MrJobs.java | 4 +- .../resourceFetch/model/SparkApplication.java | 4 +- .../model/SparkApplicationAttempt.java | 4 +- .../model/SparkApplicationWrapper.java | 4 +- .../util/resourceFetch/model/SparkExecutor.java | 4 +- .../jpm/util/resourceFetch/model/SparkJob.java | 4 +- .../util/resourceFetch/model/SparkStage.java | 4 +- .../jpm/util/resourceFetch/model/SparkTask.java | 4 +- .../model/SparkTaskInputMetrics.java | 4 +- .../resourceFetch/model/SparkTaskMetrics.java | 4 +- .../model/SparkTaskShuffleReadMetrics.java | 4 +- .../model/SparkTaskShuffleWriteMetrics.java | 4 +- .../resourceFetch/model/TaskCounterGroup.java | 5 +- .../resourceFetch/model/TaskCounterItem.java | 4 +- .../util/resourceFetch/model/TaskCounters.java | 5 +- .../model/TaskCountersWrapper.java | 4 +- .../url/JobListServiceURLBuilderImpl.java | 56 ++-- .../resourceFetch/url/ServiceURLBuilder.java | 4 +- .../SparkCompleteJobServiceURLBuilderImpl.java | 2 +- .../url/SparkJobServiceURLBuilderImpl.java | 2 +- .../jpm/util/resourceFetch/url/URLUtil.java | 2 +- .../hive/jobrunning/HiveJobFetchSpout.java | 12 +- 139 files changed, 1783 insertions(+), 1582 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java index f2c4b1f..2802449 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java @@ -20,7 +20,7 @@ package org.apache.eagle.hadoop.queue.common; import com.typesafe.config.Config; import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.ObjectMapper; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java index 05e3be9..02f67d4 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java @@ -19,7 +19,7 @@ package org.apache.eagle.hadoop.queue.common; import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.jpm.util.resourceFetch.ha.AbstractURLSelector; +import org.apache.eagle.jpm.util.resourcefetch.ha.AbstractURLSelector; public class YarnURLSelectorImpl extends AbstractURLSelector { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java index 975e633..3c4391b 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java @@ -27,7 +27,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils; import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder; import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl; import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector; +import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java index 97ebd50..35f346b 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfig.java @@ -32,7 +32,7 @@ public final class JobConfig { this.config = config; } - public String toString(){ + public String toString() { return config.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java index 5af4377..cfa50f9 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigSerDeser.java @@ -35,11 +35,15 @@ public class JobConfigSerDeser implements EntitySerDeser<JobConfig> { String sb = Bytes.toString(bytes); String[] keyValue = sb.split(","); for (String pair : keyValue) { - String str[] = pair.split(":"); - if (pair.equals("") || str[0].equals("")) continue; + String[] str = pair.split(":"); + if (pair.equals("") || str[0].equals("")) { + continue; + } String key = str[0]; String value = ""; - if (str.length == 2) value = str[1]; + if (str.length == 2) { + value = str[1]; + } map.put(key, value); } return jc; @@ -49,14 +53,15 @@ public class JobConfigSerDeser implements EntitySerDeser<JobConfig> { public byte[] serialize(JobConfig conf) { Map<String, String> map = conf.getConfig(); StringBuilder sb = new StringBuilder(); - for (Entry<String, String> entry : map.entrySet()) + for (Entry<String, String> entry : map.entrySet()) { sb.append(entry.getKey() + ":" + entry.getValue() + ","); + } sb.deleteCharAt(sb.length() - 1); return sb.toString().getBytes(); } @Override - public Class<JobConfig> type(){ + public Class<JobConfig> type() { return JobConfig.class; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java index 3a09c5f..d186fd4 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobConfigurationAPIEntity.java @@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa") @ColumnFamily("f") @Prefix("jconf") @@ -30,9 +30,9 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true), - @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false) -}) + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false) + }) public class JobConfigurationAPIEntity extends JobBaseAPIEntity { @Column("a") @@ -45,20 +45,25 @@ public class JobConfigurationAPIEntity extends JobBaseAPIEntity { public JobConfig getJobConfig() { return jobConfig; } + public void setJobConfig(JobConfig jobConfig) { this.jobConfig = jobConfig; _pcs.firePropertyChange("jobConfig", null, null); } + public String getConfigJobName() { return configJobName; } + public void setConfigJobName(String configJobName) { this.configJobName = configJobName; _pcs.firePropertyChange("configJobName", null, null); } + public String getAlertEmailList() { return alertEmailList; } + public void setAlertEmailList(String alertEmailList) { this.alertEmailList = alertEmailList; _pcs.firePropertyChange("alertEmailList", null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java index b289a9c..c6bb8e4 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobEventAPIEntity.java @@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa") @ColumnFamily("f") @Prefix("jevent") @@ -30,13 +30,13 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) public class JobEventAPIEntity extends JobBaseAPIEntity { - @Column("a") private String eventType; public String getEventType() { return eventType; } + public void setEventType(String eventType) { this.eventType = eventType; _pcs.firePropertyChange("eventType", null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java index d9093ff..1f75f07 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java @@ -23,7 +23,7 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa") @ColumnFamily("f") @Prefix("jexec") @@ -31,8 +31,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true), - @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false) + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false) }) public class JobExecutionAPIEntity extends JobBaseAPIEntity { @Column("a") @@ -85,6 +85,7 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { public long getDurationTime() { return durationTime; } + public void setDurationTime(long durationTime) { this.durationTime = durationTime; valueChanged("durationTime"); @@ -93,59 +94,75 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { public String getCurrentState() { return currentState; } + public void setCurrentState(String currentState) { this.currentState = currentState; _pcs.firePropertyChange("currentState", null, null); } + public long getStartTime() { return startTime; } + public void setStartTime(long startTime) { this.startTime = startTime; _pcs.firePropertyChange("startTime", null, null); } + public long getEndTime() { return endTime; } + public void setEndTime(long endTime) { this.endTime = endTime; _pcs.firePropertyChange("endTime", null, null); } + public int getNumTotalMaps() { return numTotalMaps; } + public void setNumTotalMaps(int numTotalMaps) { this.numTotalMaps = numTotalMaps; _pcs.firePropertyChange("numTotalMaps", null, null); } + public int getNumFailedMaps() { return numFailedMaps; } + public void setNumFailedMaps(int numFailedMaps) { this.numFailedMaps = numFailedMaps; _pcs.firePropertyChange("numFailedMaps", null, null); } + public int getNumFinishedMaps() { return numFinishedMaps; } + public void setNumFinishedMaps(int numFinishedMaps) { this.numFinishedMaps = numFinishedMaps; _pcs.firePropertyChange("numFinishedMaps", null, null); } + public int getNumTotalReduces() { return numTotalReduces; } + public void setNumTotalReduces(int numTotalReduces) { this.numTotalReduces = numTotalReduces; _pcs.firePropertyChange("numTotalReduces", null, null); } + public int getNumFailedReduces() { return numFailedReduces; } + public void setNumFailedReduces(int numFailedReduces) { this.numFailedReduces = numFailedReduces; _pcs.firePropertyChange("numFailedReduces", null, null); } + public int getNumFinishedReduces() { return numFinishedReduces; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java index df57657..6afe347 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobProcessTimeStampEntity.java @@ -23,7 +23,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa_process") @ColumnFamily("f") @Prefix("process") @@ -37,6 +37,7 @@ public class JobProcessTimeStampEntity extends TaggedLogAPIEntity { public long getCurrentTimeStamp() { return currentTimeStamp; } + public void setCurrentTimeStamp(long currentTimeStamp) { this.currentTimeStamp = currentTimeStamp; _pcs.firePropertyChange("currentTimeStamp", null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java index 89272bf..e526f45 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptCounterAPIEntity.java @@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa_anomaly") @ColumnFamily("f") @Prefix("tacount") @@ -40,20 +40,25 @@ public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity { public int getKilledCount() { return killedCount; } + public void setKilledCount(int killedCount) { this.killedCount = killedCount; _pcs.firePropertyChange("killedCount", null, null); } + public int getFailedCount() { return failedCount; } + public void setFailedCount(int failedCount) { this.failedCount = failedCount; _pcs.firePropertyChange("failedCount", null, null); } + public int getTotalCount() { return totalCount; } + public void setTotalCount(int totalCount) { this.totalCount = totalCount; _pcs.firePropertyChange("totalCount", null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java index be5566b..620ee1f 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java @@ -23,7 +23,7 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa_task") @ColumnFamily("f") @Prefix("taexec") @@ -31,7 +31,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobID" }, unique = false) + @Index(name = "Index_1_jobId", columns = { "jobID" }, unique = false) }) public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { @Column("a") @@ -52,48 +52,61 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public String getTaskStatus() { return taskStatus; } + public void setTaskStatus(String taskStatus) { this.taskStatus = taskStatus; _pcs.firePropertyChange("taskStatus", null, null); } + public long getStartTime() { return startTime; } + public void setStartTime(long startTime) { this.startTime = startTime; _pcs.firePropertyChange("startTime", null, null); } + public long getEndTime() { return endTime; } + public void setEndTime(long endTime) { this.endTime = endTime; _pcs.firePropertyChange("endTime", null, null); } + public long getDuration() { return duration; } + public void setDuration(long duration) { this.duration = duration; _pcs.firePropertyChange("duration", null, null); } + public String getError() { return error; } + public void setError(String error) { this.error = error; _pcs.firePropertyChange("error", null, null); } + public JobCounters getJobCounters() { return jobCounters; } + public void setJobCounters(JobCounters jobCounters) { this.jobCounters = jobCounters; _pcs.firePropertyChange("jobCounters", null, null); } + public String getTaskAttemptID() { return taskAttemptID; } + public void setTaskAttemptID(String taskAttemptID) { this.taskAttemptID = taskAttemptID; _pcs.firePropertyChange("taskAttemptID", null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java index 9de8b05..bf559d4 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskExecutionAPIEntity.java @@ -23,7 +23,7 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa_task") @ColumnFamily("f") @Prefix("texec") @@ -31,8 +31,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false) -}) + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false) + }) public class TaskExecutionAPIEntity extends JobBaseAPIEntity { @Column("a") private String taskStatus; @@ -50,41 +50,52 @@ public class TaskExecutionAPIEntity extends JobBaseAPIEntity { public String getTaskStatus() { return taskStatus; } + public void setTaskStatus(String taskStatus) { this.taskStatus = taskStatus; _pcs.firePropertyChange("taskStatus", null, null); } + public long getStartTime() { return startTime; } + public void setStartTime(long startTime) { this.startTime = startTime; _pcs.firePropertyChange("startTime", null, null); } + public long getEndTime() { return endTime; } + public void setEndTime(long endTime) { this.endTime = endTime; _pcs.firePropertyChange("endTime", null, null); } + public long getDuration() { return duration; } + public void setDuration(long duration) { this.duration = duration; _pcs.firePropertyChange("duration", null, null); } + public String getError() { return error; } + public void setError(String error) { this.error = error; _pcs.firePropertyChange("error", null, null); } + public JobCounters getJobCounters() { return jobCounters; } + public void setJobCounters(JobCounters jobCounters) { this.jobCounters = jobCounters; _pcs.firePropertyChange("jobCounters", null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java index 1445a24..31f96da 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java @@ -22,7 +22,7 @@ import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eaglejpa_anomaly") @ColumnFamily("f") @Prefix("taskfailurecount") @@ -37,7 +37,6 @@ public class TaskFailureCountAPIEntity extends JobBaseAPIEntity { @Column("c") private String taskStatus; - public String getTaskStatus() { return taskStatus; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java index 653f1c9..86b6554 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java @@ -20,12 +20,12 @@ package org.apache.eagle.jpm.mr.runningentity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.jobcounter.JobCounters; -import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; +import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eagleMRRunningJobs") @ColumnFamily("f") @Prefix("jobs") @@ -33,9 +33,9 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true), - @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false) -}) + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false) + }) @Tags({"site", "jobId", "jobName", "jobDefId", "jobType", "user", "queue"}) public class JobExecutionAPIEntity extends TaggedLogAPIEntity { @Column("a") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java index 11a8b4c..088869f 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java @@ -23,7 +23,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eagleMRRunningTasks") @ColumnFamily("f") @Prefix("tasks_exec_attempt") @@ -31,8 +31,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false) -}) + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false) + }) @Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "host", "rack"}) public class TaskAttemptExecutionAPIEntity extends TaggedLogAPIEntity { @Column("a") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java index 50e042f..d1d62ee 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java @@ -24,7 +24,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eagleMRRunningTasks") @ColumnFamily("f") @Prefix("tasks_exec") @@ -32,8 +32,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false) -}) + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false) + }) @Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "hostname"}) public class TaskExecutionAPIEntity extends TaggedLogAPIEntity { @Column("a") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java index feeee7b..8a8d0db 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java @@ -20,6 +20,5 @@ package org.apache.eagle.jpm.spark.crawl; import java.io.InputStream; public interface JHFInputStreamReader { - public void read(InputStream is) throws Exception; - + void read(InputStream is) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java index 48701f7..62ba7d9 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java @@ -21,9 +21,9 @@ import java.io.InputStream; public interface JHFParserBase { /** - * this method will ensure to close the inputstream + * this method will ensure to close the inputStream. * @param is * @throws Exception */ - public void parse(InputStream is) throws Exception; + void parse(InputStream is) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java index 1b75e81..e298fa3 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java @@ -17,9 +17,6 @@ package org.apache.eagle.jpm.spark.crawl; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.jpm.spark.entity.JobConfig; import org.apache.eagle.jpm.spark.entity.*; import org.apache.eagle.jpm.util.JSONUtil; import org.apache.eagle.jpm.util.JobNameNormalization; @@ -28,6 +25,8 @@ import org.apache.eagle.jpm.util.SparkJobTagName; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.service.client.impl.EagleServiceBaseClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.slf4j.Logger; @@ -116,8 +115,8 @@ public class JHFSparkEventReader { List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info"); String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port", - "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", - "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; + "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", + "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; jobConfs.addAll(Arrays.asList(props)); for (String prop : jobConfs) { if (sparkProps.containsKey(prop)) { @@ -363,9 +362,9 @@ public class JHFSparkEventReader { stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time")); if (stageInfo.containsKey("Failure Reason")) { - stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString()); + stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString()); } else { - stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()); + stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString()); } } @@ -383,9 +382,9 @@ public class JHFSparkEventReader { JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result"); String result = JSONUtil.getString(jobResult, "Result"); if (result.equalsIgnoreCase("JobSucceeded")) { - job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString()); + job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString()); } else { - job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString()); + job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString()); } } @@ -429,15 +428,23 @@ public class JHFSparkEventReader { app.setExecutors(executors.values().size()); long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName())); - long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); - int executoreCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); - int driverCore = this.isClientMode(app.getConfig()) ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); + long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) + ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) + : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); + + int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); + int driverCore = this.isClientMode(app.getConfig()) + ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) + : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); + long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead"); - long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); + long driverMemoryOverhead = this.isClientMode(app.getConfig()) + ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") + : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); app.setExecMemoryBytes(executorMemory); app.setDriveMemoryBytes(driverMemory); - app.setExecutorCores(executoreCore); + app.setExecutorCores(executorCore); app.setDriverCores(driverCore); app.setExecutorMemoryOverhead(executorMemoryOverhead); app.setDriverMemoryOverhead(driverMemoryOverhead); @@ -450,11 +457,12 @@ public class JHFSparkEventReader { executor.setMemoryOverhead(driverMemoryOverhead); } else { executor.setExecMemoryBytes(executorMemory); - executor.setCores(executoreCore); + executor.setCores(executorCore); executor.setMemoryOverhead(executorMemoryOverhead); } - if (executor.getEndTime() == 0) + if (executor.getEndTime() == 0) { executor.setEndTime(app.getEndTime()); + } this.aggregateExecutorToApp(executor); } this.flushEntities(executors.values(), false); @@ -464,16 +472,16 @@ public class JHFSparkEventReader { } private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) { - long result = 0l; + long result = 0L; if (config.getConfig().containsKey(fieldName)) { result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m"); - if(result == 0l){ - result = this.parseExecutorMemory(config.getConfig().get(fieldName)); + if (result == 0L) { + result = this.parseExecutorMemory(config.getConfig().get(fieldName)); } } - if(result == 0l){ - result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); + if (result == 0L) { + result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); } return result; } @@ -588,7 +596,7 @@ public class JHFSparkEventReader { job.setNumTask(job.getNumTask() + stage.getNumTasks()); - if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { + if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) { //if multiple attempts succeed, just count one if (!hasStagePriorAttemptSuccess(stage)) { job.setNumCompletedStages(job.getNumCompletedStages() + 1); @@ -603,7 +611,7 @@ public class JHFSparkEventReader { Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString())); for (Integer i = 0; i < stageAttemptId; i++) { SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString())); - if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { + if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) { return true; } } @@ -659,22 +667,22 @@ public class JHFSparkEventReader { if (memory.endsWith("g") || memory.endsWith("G")) { int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * 1024 * executorGB; + return 1024L * 1024 * 1024 * executorGB; } else if (memory.endsWith("m") || memory.endsWith("M")) { int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * executorMB; + return 1024L * 1024 * executorMB; } else if (memory.endsWith("k") || memory.endsWith("K")) { int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * executorKB; + return 1024L * executorKB; } else if (memory.endsWith("t") || memory.endsWith("T")) { int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * 1024 * 1024 * executorTB; + return 1024L * 1024 * 1024 * 1024 * executorTB; } else if (memory.endsWith("p") || memory.endsWith("P")) { int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); - return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB; + return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB; } - LOG.info("Cannot parse memory info " + memory); - return 0l; + LOG.info("Cannot parse memory info " + memory); + return 0L; } private void flushEntities(Object entity, boolean forceFlush) { @@ -709,20 +717,6 @@ public class JHFSparkEventReader { private void doFlush(List entities) throws Exception { LOG.info("start flushing entities of total number " + entities.size()); -// client.create(entities); LOG.info("finish flushing entities of total number " + entities.size()); -// for(Object entity: entities){ -// if(entity instanceof SparkApp){ -// for (Field field : entity.getClass().getDeclaredFields()) { -// field.setAccessible(true); // You might want to set modifier to public first. -// Object value = field.get(entity); -// if (value != null) { -// System.out.println(field.getName() + "=" + value); -// } -// } -// } -// } } - - } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java index 171cb0f..da049ea 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java @@ -33,7 +33,7 @@ public class JHFSparkParser implements JHFParserBase { JHFSparkEventReader eventReader; - public JHFSparkParser(JHFSparkEventReader reader){ + public JHFSparkParser(JHFSparkEventReader reader) { this.eventReader = reader; } @@ -41,22 +41,22 @@ public class JHFSparkParser implements JHFParserBase { public void parse(InputStream is) throws Exception { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); try { - String line = null; + String line; JSONParser parser = new JSONParser(); - while((line = reader.readLine()) != null){ - try{ + while ((line = reader.readLine()) != null) { + try { JSONObject eventObj = (JSONObject) parser.parse(line); String eventType = (String) eventObj.get("Event"); logger.info("Event type: " + eventType); this.eventReader.read(eventObj); - }catch(Exception e){ + } catch (Exception e) { logger.error(String.format("Invalid json string. Fail to parse %s.", line), e); } } this.eventReader.clearReader(); } finally { - if(reader != null){ + if (reader != null) { reader.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java index f1d2cd1..3964454 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java @@ -31,7 +31,7 @@ public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReade private SparkApplicationInfo app; - public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app){ + public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) { this.site = site; this.app = app; } @@ -45,7 +45,7 @@ public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReade parser.parse(is); } - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo()); impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1"))); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java index 11c4a22..0664954 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java @@ -32,8 +32,9 @@ public class JobConfig implements Serializable { public void setConfig(Map<String, String> config) { this.config = config; } + @Override - public String toString(){ + public String toString() { return config.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java index 528a91f..58697a1 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java @@ -18,10 +18,10 @@ package org.apache.eagle.jpm.spark.entity; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.apache.eagle.jpm.util.Constants; @Table("eglesprk_apps") @ColumnFamily("f") @@ -31,7 +31,7 @@ import org.apache.eagle.jpm.util.Constants; @TimeSeries(true) @Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName","user", "queue"}) @Partition({"site"}) -public class SparkApp extends TaggedLogAPIEntity{ +public class SparkApp extends TaggedLogAPIEntity { @Column("a") private long startTime; @@ -222,11 +222,14 @@ public class SparkApp extends TaggedLogAPIEntity{ return driveMemoryBytes; } - public int getCompleteTasks(){ return completeTasks;} + public int getCompleteTasks() { + return completeTasks; + } public JobConfig getConfig() { return config; } + public void setStartTime(long startTime) { this.startTime = startTime; valueChanged("startTime"); @@ -377,7 +380,7 @@ public class SparkApp extends TaggedLogAPIEntity{ valueChanged("driveMemoryBytes"); } - public void setCompleteTasks(int completeTasks){ + public void setCompleteTasks(int completeTasks) { this.completeTasks = completeTasks; valueChanged("completeTasks"); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java index 366e4aa..4b669ef 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java @@ -18,10 +18,10 @@ package org.apache.eagle.jpm.spark.entity; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.apache.eagle.jpm.util.Constants; @Table("eglesprk_executors") @ColumnFamily("f") @@ -31,7 +31,7 @@ import org.apache.eagle.jpm.util.Constants; @TimeSeries(true) @Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "executorId","user", "queue"}) @Partition({"site"}) -public class SparkExecutor extends TaggedLogAPIEntity{ +public class SparkExecutor extends TaggedLogAPIEntity { @Column("a") private String hostPort; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java index acecb3a..79ac6da 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java @@ -18,10 +18,10 @@ package org.apache.eagle.jpm.spark.entity; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.apache.eagle.jpm.util.Constants; @Table("eglesprk_jobs") @ColumnFamily("f") @@ -31,34 +31,34 @@ import org.apache.eagle.jpm.util.Constants; @TimeSeries(true) @Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId","user", "queue"}) @Partition({"site"}) -public class SparkJob extends TaggedLogAPIEntity{ +public class SparkJob extends TaggedLogAPIEntity { @Column("a") private long submissionTime; @Column("b") private long completionTime; @Column("c") - private int numStages=0; + private int numStages = 0; @Column("d") private String status; @Column("e") - private int numTask=0; + private int numTask = 0; @Column("f") - private int numActiveTasks=0; + private int numActiveTasks = 0; @Column("g") - private int numCompletedTasks=0; + private int numCompletedTasks = 0; @Column("h") - private int numSkippedTasks=0; + private int numSkippedTasks = 0; @Column("i") - private int numFailedTasks=0; + private int numFailedTasks = 0; @Column("j") - private int numActiveStages=0; + private int numActiveStages = 0; @Column("k") - private int numCompletedStages=0; + private int numCompletedStages = 0; @Column("l") - private int numSkippedStages=0; + private int numSkippedStages = 0; @Column("m") - private int numFailedStages=0; + private int numFailedStages = 0; public long getSubmissionTime() { return submissionTime; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java index fcca889..3f56da6 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java @@ -18,10 +18,10 @@ package org.apache.eagle.jpm.spark.entity; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.apache.eagle.jpm.util.Constants; @Table("eglesprk_stages") @ColumnFamily("f") @@ -31,38 +31,38 @@ import org.apache.eagle.jpm.util.Constants; @TimeSeries(true) @Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "stageId","stageAttemptId","user", "queue"}) @Partition({"site"}) -public class SparkStage extends TaggedLogAPIEntity{ +public class SparkStage extends TaggedLogAPIEntity { @Column("a") private String status; @Column("b") - private int numActiveTasks=0; + private int numActiveTasks = 0; @Column("c") - private int numCompletedTasks=0; + private int numCompletedTasks = 0; @Column("d") - private int numFailedTasks=0; + private int numFailedTasks = 0; @Column("e") - private long executorRunTime=0l; + private long executorRunTime = 0L; @Column("f") - private long inputBytes=0l; + private long inputBytes = 0L; @Column("g") - private long inputRecords=0l; + private long inputRecords = 0L; @Column("h") - private long outputBytes=0l; + private long outputBytes = 0L; @Column("i") - private long outputRecords=0l; + private long outputRecords = 0L; @Column("j") - private long shuffleReadBytes=0l; + private long shuffleReadBytes = 0L; @Column("k") - private long shuffleReadRecords=0l; + private long shuffleReadRecords = 0L; @Column("l") - private long shuffleWriteBytes=0l; + private long shuffleWriteBytes = 0L; @Column("m") - private long shuffleWriteRecords=0l; + private long shuffleWriteRecords = 0L; @Column("n") - private long memoryBytesSpilled=0l; + private long memoryBytesSpilled = 0L; @Column("o") - private long diskBytesSpilled=0l; + private long diskBytesSpilled = 0L; @Column("p") private String name; @Column("q") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java index 6ef7c69..fb2fce5 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java @@ -18,10 +18,10 @@ package org.apache.eagle.jpm.spark.entity; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.apache.eagle.jpm.util.Constants; @Table("eglesprk_tasks") @ColumnFamily("f") @@ -31,7 +31,7 @@ import org.apache.eagle.jpm.util.Constants; @TimeSeries(true) @Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"}) @Partition({"site"}) -public class SparkTask extends TaggedLogAPIEntity{ +public class SparkTask extends TaggedLogAPIEntity { @Column("a") private int taskId; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java index 5b330fc..74489cd 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java @@ -30,48 +30,49 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * job history is the resource + * job history is the resource. */ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { private static final Logger LOG = LoggerFactory.getLogger(AbstractJobHistoryDAO.class); - private final static String YEAR_URL_FORMAT = "/%4d"; - private final static String MONTH_URL_FORMAT = "/%02d"; - private final static String DAY_URL_FORMAT = "/%02d"; - private final static String YEAR_MONTH_DAY_URL_FORMAT = YEAR_URL_FORMAT + MONTH_URL_FORMAT + DAY_URL_FORMAT; - protected final static String SERIAL_URL_FORMAT = "/%06d"; - protected final static String FILE_URL_FORMAT = "/%s"; + private static final String YEAR_URL_FORMAT = "/%4d"; + private static final String MONTH_URL_FORMAT = "/%02d"; + private static final String DAY_URL_FORMAT = "/%02d"; + private static final String YEAR_MONTH_DAY_URL_FORMAT = YEAR_URL_FORMAT + MONTH_URL_FORMAT + DAY_URL_FORMAT; + protected static final String SERIAL_URL_FORMAT = "/%06d"; + protected static final String FILE_URL_FORMAT = "/%s"; private static final Pattern JOBTRACKERNAME_PATTERN = Pattern.compile("^.*_(\\d+)_$"); protected static final Pattern JOBID_PATTERN = Pattern.compile("job_\\d+_\\d+"); - protected final String m_basePath; - protected volatile String m_jobTrackerName; + protected final String basePath; + protected volatile String jobTrackerName; public static final String JOB_CONF_POSTFIX = "_conf.xml"; - private final static Timer timer = new Timer(true); - private final static long JOB_TRACKER_SYNC_DURATION = 10 * 60 * 1000; // 10 minutes + private static final Timer timer = new Timer(true); + private static final long JOB_TRACKER_SYNC_DURATION = 10 * 60 * 1000; // 10 minutes - private boolean m_pathContainsJobTrackerName; + private boolean pathContainsJobTrackerName; public AbstractJobHistoryDAO(String basePath, boolean pathContainsJobTrackerName, String startingJobTrackerName) throws Exception { - m_basePath = basePath; - m_pathContainsJobTrackerName = pathContainsJobTrackerName; - m_jobTrackerName = startingJobTrackerName; - if (m_pathContainsJobTrackerName) { - if (startingJobTrackerName == null || startingJobTrackerName.isEmpty()) + this.basePath = basePath; + this.pathContainsJobTrackerName = pathContainsJobTrackerName; + jobTrackerName = startingJobTrackerName; + if (this.pathContainsJobTrackerName) { + if (startingJobTrackerName == null || startingJobTrackerName.isEmpty()) { throw new IllegalStateException("startingJobTrackerName should not be null or empty"); + } // start background thread to check what is current job tracker - startThread(m_basePath); + startThread(this.basePath); } } protected String buildWholePathToYearMonthDay(int year, int month, int day) { StringBuilder sb = new StringBuilder(); - sb.append(m_basePath); - if (!m_pathContainsJobTrackerName && m_jobTrackerName != null && !m_jobTrackerName.isEmpty()) { + sb.append(basePath); + if (!pathContainsJobTrackerName && jobTrackerName != null && !jobTrackerName.isEmpty()) { sb.append("/"); - sb.append(m_jobTrackerName); + sb.append(jobTrackerName); } sb.append(String.format(YEAR_MONTH_DAY_URL_FORMAT, year, month, day)); return sb.toString(); @@ -105,7 +106,7 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { sb.append(JOB_CONF_POSTFIX); return sb.toString(); } - LOG.warn("Illegal job history file name: "+jobHistFileName); + LOG.warn("Illegal job history file name: " + jobHistFileName); return null; } @@ -118,11 +119,11 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { try { LOG.info("regularly checking current jobTrackerName in background"); final String _jobTrackerName = calculateJobTrackerName(basePath); - if (_jobTrackerName != null && !_jobTrackerName.equals(m_jobTrackerName)) { - LOG.info("jobTrackerName changed from " + m_jobTrackerName +" to " + _jobTrackerName); - m_jobTrackerName = _jobTrackerName; + if (_jobTrackerName != null && !_jobTrackerName.equals(jobTrackerName)) { + LOG.info("jobTrackerName changed from " + jobTrackerName + " to " + _jobTrackerName); + jobTrackerName = _jobTrackerName; } - LOG.info("Current jobTrackerName is: " + m_jobTrackerName); + LOG.info("Current jobTrackerName is: " + jobTrackerName); } catch (Exception e) { LOG.error("failed to figure out current job tracker name that is not configured due to: " + e.getMessage(), e); } catch (Throwable t) { @@ -139,7 +140,7 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { try { downloadIs = getJHFFileContentAsStream(year, month, day, serialNumber, jobHistoryFileName); } catch (FileNotFoundException ex) { - LOG.error("job history file not found " + jobHistoryFileName+", ignore and will NOT process any more"); + LOG.error("job history file not found " + jobHistoryFileName + ", ignore and will NOT process any more"); return; } @@ -147,7 +148,7 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { try { downloadJobConfIs = getJHFConfContentAsStream(year, month, day, serialNumber, jobHistoryFileName); } catch (FileNotFoundException ex) { - LOG.warn("job configuration file of "+ jobHistoryFileName+" not found , ignore and use empty configuration"); + LOG.warn("job configuration file of " + jobHistoryFileName + " not found , ignore and use empty configuration"); } org.apache.hadoop.conf.Configuration conf = null; @@ -164,12 +165,12 @@ public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { } catch (Exception ex) { LOG.error("fail reading job history file", ex); throw ex; - } catch(Throwable t) { + } catch (Throwable t) { LOG.error("fail reading job history file", t); throw new Exception(t); } finally { try { - if(downloadJobConfIs != null) { + if (downloadJobConfIs != null) { downloadJobConfIs.close(); } if (downloadIs != null) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index aeb35fd..87cd4e0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -32,30 +32,32 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { private static final Logger LOG = LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class); - private JobHistoryContentFilter m_filter; - private MRHistoryJobConfig m_configManager; + private JobHistoryContentFilter filter; + private MRHistoryJobConfig configManager; public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) { - this.m_filter = filter; - this.m_configManager = configManager; + this.filter = filter; + this.configManager = configManager; } @Override public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception { - final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig(); + final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); @SuppressWarnings("serial") - Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", jobExtractorConfig.site); - } }; + Map<String, String> baseTags = new HashMap<String, String>() { + { + put("site", jobExtractorConfig.site); + } + }; - if (!m_filter.acceptJobFile()) { + if (!filter.acceptJobFile()) { // close immediately if we don't need job file jobFileInputStream.close(); } else { //get parser and parse, do not need to emit data now - JHFParserBase parser = JHFParserFactory.getParser(m_configManager, + JHFParserBase parser = JHFParserFactory.getParser(configManager, baseTags, - conf, m_filter); + conf, filter); parser.parse(jobFileInputStream); jobFileInputStream.close(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java index 693e876..70eab38 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java @@ -21,9 +21,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray; import java.io.Serializable; /** - * expose simple interface for streaming executor to populate output data + * expose simple interface for streaming executor to populate output data. * */ -public interface EagleOutputCollector extends Serializable{ - void collect(ValuesArray t); +public interface EagleOutputCollector extends Serializable { + void collect(ValuesArray t); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java index 3edde5b..69eb94a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java @@ -20,8 +20,8 @@ package org.apache.eagle.jpm.mr.history.crawler; public interface JHFCrawlerDriver { /** - * return -1 if failed or there is no file to crawl - * return modified time of the file if succeed + * return -1 if failed or there is no file to crawl. + * return modified time of the file if succeed. */ long crawl() throws Exception; }