[EAGLE-422] eagle support for mr & spark running job monitoring Author: jinhuwu <jinh...@ebay.com> Author: pkuwm <ihuizhi...@gmail.com> Author: Zhao, Qingwen <qingwz...@ebay.com>
Closes #309 from wujinhu/develop. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/5bf2c62d Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/5bf2c62d Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/5bf2c62d Branch: refs/heads/develop Commit: 5bf2c62d264ebc7922b18d932995b4f2b46c4a1a Parents: 5c0db6a Author: jinhuwu <jinh...@ebay.com> Authored: Tue Aug 9 13:25:09 2016 +0800 Committer: Zhao, Qingwen <qingwz...@ebay.com> Committed: Tue Aug 9 13:25:09 2016 +0800 ---------------------------------------------------------------------- .../eagle/alert/engine/e2e/Integration1.java | 4 + .../eagle/jobrunning/ha/HAURLSelectorImpl.java | 9 +- .../eagle/jobrunning/util/InputStreamUtils.java | 5 + .../apache/eagle/jpm/spark/crawl/EventType.java | 24 + .../jpm/spark/crawl/JHFInputStreamReader.java | 25 + .../eagle/jpm/spark/crawl/JHFParserBase.java | 29 + .../jpm/spark/crawl/JHFSparkEventReader.java | 728 +++++++++++++++++++ .../eagle/jpm/spark/crawl/JHFSparkParser.java | 64 ++ .../jpm/spark/crawl/SparkApplicationInfo.java | 69 ++ .../SparkFilesystemInputStreamReaderImpl.java | 53 ++ .../eagle/jpm/mr/history/MRHistoryJobMain.java | 24 +- .../jpm/mr/history/common/JPAConstants.java | 95 --- .../eagle/jpm/mr/history/common/JobConfig.java | 38 - .../history/entities/JPAEntityRepository.java | 4 +- .../jpm/mr/history/entities/JobConfig.java | 38 + .../mr/history/entities/JobConfigSerDeser.java | 1 - .../entities/JobConfigurationAPIEntity.java | 9 +- .../history/entities/JobCountersSerDeser.java | 166 ----- .../mr/history/entities/JobEventAPIEntity.java | 4 +- .../history/entities/JobExecutionAPIEntity.java | 98 ++- .../entities/JobProcessTimeStampEntity.java | 4 +- .../entities/TaskAttemptCounterAPIEntity.java | 4 +- .../entities/TaskAttemptExecutionAPIEntity.java | 6 +- .../entities/TaskExecutionAPIEntity.java | 6 +- .../entities/TaskFailureCountAPIEntity.java | 4 +- .../jobcounter/CounterGroupDictionary.java | 238 ------ .../mr/history/jobcounter/CounterGroupKey.java | 32 - .../jpm/mr/history/jobcounter/CounterKey.java | 30 - .../history/jobcounter/JobCounterException.java | 63 -- .../jpm/mr/history/jobcounter/JobCounters.java | 47 -- .../jpm/mr/history/parser/EagleJobTagName.java | 48 -- .../mr/history/parser/JHFEventReaderBase.java | 161 ++-- .../mr/history/parser/JHFMRVer1EventReader.java | 2 +- .../mr/history/parser/JHFMRVer2EventReader.java | 5 +- .../parser/JobEntityLifecycleAggregator.java | 30 +- .../parser/TaskAttemptCounterListener.java | 13 +- .../mr/history/parser/TaskFailureListener.java | 17 +- .../src/main/resources/JobCounter.conf | 3 + .../src/main/resources/application.conf | 2 +- eagle-jpm/eagle-jpm-mr-running/pom.xml | 126 ++++ .../assembly/eagle-jpm-mr-running-assembly.xml | 65 ++ .../eagle/jpm/mr/running/MRRunningJobMain.java | 96 +++ .../running/config/MRRunningConfigManager.java | 142 ++++ .../running/entities/JPMEntityRepository.java | 32 + .../jpm/mr/running/entities/JobConfig.java | 25 + .../running/entities/JobExecutionAPIEntity.java | 437 +++++++++++ .../entities/TaskAttemptExecutionAPIEntity.java | 135 ++++ .../entities/TaskExecutionAPIEntity.java | 136 ++++ .../parser/MRJobEntityCreationHandler.java | 98 +++ .../jpm/mr/running/parser/MRJobParser.java | 553 ++++++++++++++ .../AbstractMetricsCreationListener.java | 42 ++ .../JobExecutionMetricsCreationListener.java | 60 ++ .../TaskExecutionMetricsCreationListener.java | 45 ++ .../mr/running/recover/MRRunningJobManager.java | 80 ++ .../running/storm/MRRunningJobFetchSpout.java | 171 +++++ .../mr/running/storm/MRRunningJobParseBolt.java | 114 +++ .../src/main/resources/JobCounter.conf | 187 +++++ .../src/main/resources/application.conf | 86 +++ .../src/main/resources/log4j.properties | 35 + .../jpm/spark/history/crawl/EventType.java | 24 - .../history/crawl/JHFInputStreamReader.java | 25 - .../jpm/spark/history/crawl/JHFParserBase.java | 29 - .../history/crawl/JHFSparkEventReader.java | 701 ------------------ .../jpm/spark/history/crawl/JHFSparkParser.java | 63 -- .../history/crawl/SparkApplicationInfo.java | 69 -- .../SparkHistoryFileInputStreamReaderImpl.java | 53 -- .../status/JobHistoryZKStateManager.java | 2 +- .../history/storm/FinishedSparkJobSpout.java | 11 +- .../spark/history/storm/SparkJobParseBolt.java | 127 ++-- .../src/main/resources/application.conf | 4 +- eagle-jpm/eagle-jpm-spark-running/pom.xml | 158 +++- .../eagle-jpm-spark-running-assembly.xml | 65 ++ .../jpm/spark/running/SparkRunningJobMain.java | 84 +++ .../common/SparkRunningConfigManager.java | 151 ++++ .../eagle/jpm/spark/running/common/Util.java | 35 + .../running/entities/JPMEntityRepository.java | 30 + .../jpm/spark/running/entities/JobConfig.java | 25 + .../spark/running/entities/SparkAppEntity.java | 472 ++++++++++++ .../running/entities/SparkExecutorEntity.java | 232 ++++++ .../spark/running/entities/SparkJobEntity.java | 190 +++++ .../running/entities/SparkStageEntity.java | 298 ++++++++ .../spark/running/entities/SparkTaskEntity.java | 289 ++++++++ .../parser/SparkAppEntityCreationHandler.java | 73 ++ .../running/parser/SparkApplicationParser.java | 647 ++++++++++++++++ .../running/recover/SparkRunningJobManager.java | 80 ++ .../storm/SparkRunningJobFetchSpout.java | 178 +++++ .../running/storm/SparkRunningJobParseBolt.java | 110 +++ .../services/org.apache.hadoop.fs.FileSystem | 20 + .../src/main/resources/application.conf | 66 ++ .../src/main/resources/log4j.properties | 35 + .../org/apache/eagle/jpm/util/Constants.java | 123 +++- .../org/apache/eagle/jpm/util/HDFSUtil.java | 2 +- .../org/apache/eagle/jpm/util/MRJobTagName.java | 48 ++ .../java/org/apache/eagle/jpm/util/Utils.java | 89 +++ .../util/jobcounter/CounterGroupDictionary.java | 238 ++++++ .../jpm/util/jobcounter/CounterGroupKey.java | 32 + .../eagle/jpm/util/jobcounter/CounterKey.java | 30 + .../util/jobcounter/JobCounterException.java | 63 ++ .../eagle/jpm/util/jobcounter/JobCounters.java | 48 ++ .../util/jobcounter/JobCountersSerDeser.java | 165 +++++ .../jpm/util/jobrecover/RunningJobManager.java | 255 +++++++ .../util/resourceFetch/RMResourceFetcher.java | 103 ++- .../jpm/util/resourceFetch/ResourceFetcher.java | 6 +- .../SparkHistoryServerResourceFetcher.java | 8 +- .../jpm/util/resourceFetch/model/AppInfo.java | 67 +- .../util/resourceFetch/model/AppsWrapper.java | 3 +- .../util/resourceFetch/model/ClusterInfo.java | 119 +++ .../resourceFetch/model/ClusterInfoWrapper.java | 35 + .../resourceFetch/model/JobCounterGroup.java | 42 ++ .../resourceFetch/model/JobCounterItem.java | 55 ++ .../util/resourceFetch/model/JobCounters.java | 42 ++ .../resourceFetch/model/JobCountersWrapper.java | 35 + .../jpm/util/resourceFetch/model/MRJob.java | 289 ++++++++ .../util/resourceFetch/model/MRJobsWrapper.java | 37 + .../jpm/util/resourceFetch/model/MRTask.java | 109 +++ .../util/resourceFetch/model/MRTaskAttempt.java | 136 ++++ .../model/MRTaskAttemptWrapper.java | 37 + .../resourceFetch/model/MRTaskAttempts.java | 39 + .../jpm/util/resourceFetch/model/MRTasks.java | 40 + .../resourceFetch/model/MRTasksWrapper.java | 37 + .../jpm/util/resourceFetch/model/MrJobs.java | 39 + .../util/resourceFetch/model/SparkExecutor.java | 155 ++++ .../jpm/util/resourceFetch/model/SparkJob.java | 165 +++++ .../util/resourceFetch/model/SparkStage.java | 211 ++++++ .../jpm/util/resourceFetch/model/SparkTask.java | 111 +++ .../model/SparkTaskInputMetrics.java | 46 ++ .../resourceFetch/model/SparkTaskMetrics.java | 118 +++ .../model/SparkTaskShuffleReadMetrics.java | 82 +++ .../model/SparkTaskShuffleWriteMetrics.java | 55 ++ .../resourceFetch/model/TaskCounterGroup.java | 45 ++ .../resourceFetch/model/TaskCounterItem.java | 44 ++ .../util/resourceFetch/model/TaskCounters.java | 45 ++ .../model/TaskCountersWrapper.java | 35 + .../url/JobListServiceURLBuilderImpl.java | 31 +- .../SparkCompleteJobServiceURLBuilderImpl.java | 9 +- .../url/SparkJobServiceURLBuilderImpl.java | 4 +- .../jpm/util/resourceFetch/url/URLUtil.java | 33 + eagle-jpm/pom.xml | 1 + 138 files changed, 10775 insertions(+), 1999 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java index 8b28080..a00d75f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java @@ -147,7 +147,11 @@ public class Integration1 { ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$); Properties topicConfiguration = new Properties(); +// ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration);// RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration); + } public static void proactive_schedule(Config config) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java index 21a81ed..e030cf3 100644 --- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java +++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java @@ -47,13 +47,12 @@ public class HAURLSelectorImpl implements HAURLSelector { public boolean checkUrl(String urlString) { InputStream is = null; try { + LOG.info("Getting input stream from url: " + urlString); is = InputStreamUtils.getInputStream(urlString, compressionType); - } - catch (Exception ex) { - LOG.info("get inputstream from url: " + urlString + " failed. "); + } catch (Exception ex) { + LOG.error("Failed to get input stream from url: " + urlString); return false; - } - finally { + } finally { if (is != null) { try { is.close(); } catch (IOException e) {/*Do nothing*/} } } return true; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java index 62a15af..03c8ba6 100644 --- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java +++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java @@ -23,8 +23,12 @@ import java.net.URLConnection; import java.util.zip.GZIPInputStream; import org.apache.eagle.jobrunning.common.JobConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InputStreamUtils { + private static final Logger LOG = LoggerFactory.getLogger(InputStreamUtils.class); + private static final int CONNECTION_TIMEOUT = 10 * 1000; private static final int READ_TIMEOUT = 5 * 60 * 1000; @@ -47,6 +51,7 @@ public class InputStreamUtils { public static InputStream getInputStream(String urlString, JobConstants.CompressionType compressionType, int timeout) throws Exception { final URL url = URLConnectionUtils.getUrl(urlString); + LOG.info("Open connection. compression type:" + compressionType + "; URL: " + url.toString()); if (compressionType.equals(JobConstants.CompressionType.GZIP)) { return openGZIPInputStream(url, timeout); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java new file mode 100644 index 0000000..1ba15b7 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.crawl; + +public enum EventType { + SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart, + SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved, + SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java new file mode 100644 index 0000000..feeee7b --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.crawl; + +import java.io.InputStream; + +public interface JHFInputStreamReader { + public void read(InputStream is) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java new file mode 100644 index 0000000..48701f7 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.crawl; + +import java.io.InputStream; + +public interface JHFParserBase { + /** + * this method will ensure to close the inputstream + * @param is + * @throws Exception + */ + public void parse(InputStream is) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java new file mode 100644 index 0000000..d5fda5a --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java @@ -0,0 +1,728 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.crawl; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import jline.internal.Log; +import org.apache.eagle.jpm.entity.*; +import org.apache.eagle.jpm.util.JSONUtil; +import org.apache.eagle.jpm.util.JobNameNormalization; +import org.apache.eagle.jpm.util.SparkEntityConstant; +import org.apache.eagle.jpm.util.SparkJobTagName; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.service.client.impl.EagleServiceBaseClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class JHFSparkEventReader { + private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class); + + public static final int FLUSH_LIMIT = 500; + private long firstTaskLaunchTime; + + private Map<String, SparkExecutor> executors; + private SparkApp app; + private Map<Integer, SparkJob> jobs; + private Map<String, SparkStage> stages; + private Map<Integer, Set<String>> jobStageMap; + private Map<Integer, SparkTask> tasks; + private EagleServiceClientImpl client; + private Map<String, Map<Integer, Boolean>> stageTaskStatusMap; + + private List<TaggedLogAPIEntity> createEntities; + + private Config conf; + + public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) { + app = new SparkApp(); + app.setTags(new HashMap<String, String>(baseTags)); + app.setYarnState(info.getState()); + app.setYarnStatus(info.getFinalStatus()); + createEntities = new ArrayList<>(); + jobs = new HashMap<Integer, SparkJob>(); + stages = new HashMap<String, SparkStage>(); + jobStageMap = new HashMap<Integer, Set<String>>(); + tasks = new HashMap<Integer, SparkTask>(); + executors = new HashMap<String, SparkExecutor>(); + stageTaskStatusMap = new HashMap<>(); + conf = ConfigFactory.load(); + this.initiateClient(); + } + + public SparkApp getApp() { + return this.app; + } + + public void read(JSONObject eventObj) throws Exception { + String eventType = (String) eventObj.get("Event"); + LOG.info("Event type: " + eventType); + if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) { + handleAppStarted(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) { + handleEnvironmentSet(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) { + handleExecutorAdd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) { + handleBlockManagerAdd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) { + handleJobStart(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) { + handleStageSubmit(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) { + handleTaskStart(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) { + handleTaskEnd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) { + handleStageComplete(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) { + handleJobEnd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) { + handleExecutorRemoved(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) { + handleAppEnd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) { + //nothing to do now + } else { + LOG.info("Not registered event type:" + eventType); + } + + } + + + private void handleEnvironmentSet(JSONObject event) { + app.setConfig(new JobConfig()); + JSONObject sparkProps = (JSONObject) event.get("Spark Properties"); + + List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info"); + String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port", + "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", + "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; + jobConfs.addAll(Arrays.asList(props)); + for (String prop : jobConfs) { + if (sparkProps.containsKey(prop)) { + app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop)); + } + } + } + + private Object getConfigVal(JobConfig config, String configName, String type) { + if (config.getConfig().containsKey(configName)) { + Object val = config.getConfig().get(configName); + if (type.equalsIgnoreCase(Integer.class.getName())) { + return Integer.parseInt((String) val); + } else { + return val; + } + } else { + if (type.equalsIgnoreCase(Integer.class.getName())) { + return conf.getInt("spark.defaultVal." + configName); + } else { + return conf.getString("spark.defaultVal." + configName); + } + } + } + + + private boolean isClientMode(JobConfig config) { + if (config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) { + return true; + } else { + return false; + } + } + + + private void handleAppStarted(JSONObject event) { + //need update all entities tag before app start + List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); + entities.addAll(this.executors.values()); + entities.add(this.app); + + long appStartTime = JSONUtil.getLong(event, "Timestamp"); + for (TaggedLogAPIEntity entity : entities) { + entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtil.getString(event, "App ID")); + entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtil.getString(event, "App Name")); + // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1. + String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtil.getString(event, "App Attempt ID"); + entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId); + // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text + // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text + entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtil.getString(event, "App Name"), null)); + entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtil.getString(event, "User")); + + entity.setTimestamp(appStartTime); + } + + this.app.setStartTime(appStartTime); + } + + private void handleExecutorAdd(JSONObject event) throws Exception { + String executorID = (String) event.get("Executor ID"); + SparkExecutor executor = this.initiateExecutor(executorID, JSONUtil.getLong(event, "Timestamp")); + + JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor Info"); + + } + + private void handleBlockManagerAdd(JSONObject event) throws Exception { + long maxMemory = JSONUtil.getLong(event, "Maximum Memory"); + long timestamp = JSONUtil.getLong(event, "Timestamp"); + JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager ID"); + String executorID = JSONUtil.getString(blockInfo, "Executor ID"); + String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, "Host"), JSONUtil.getLong(blockInfo, "Port")); + + SparkExecutor executor = this.initiateExecutor(executorID, timestamp); + executor.setMaxMemory(maxMemory); + executor.setHostPort(hostport); + } + + private void handleTaskStart(JSONObject event) { + this.initializeTask(event); + } + + private void handleTaskEnd(JSONObject event) { + JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info"); + Integer taskId = JSONUtil.getInt(taskInfo, "Task ID"); + SparkTask task = null; + if (tasks.containsKey(taskId)) { + task = tasks.get(taskId); + } else { + return; + } + + task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed")); + JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics"); + if (null != taskMetrics) { + task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, "Executor Deserialize Time")); + task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor Run Time")); + task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time")); + task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size")); + task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, "Result Serialization Time")); + task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory Bytes Spilled")); + task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes Spilled")); + + JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, "Input Metrics"); + if (null != inputMetrics) { + task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes Read")); + task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records Read")); + } + + JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, "Output Metrics"); + if (null != outputMetrics) { + task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes Written")); + task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records Written")); + } + + JSONObject shuffleWriteMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics"); + if (null != shuffleWriteMetrics) { + task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes Written")); + task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Records Written")); + } + + JSONObject shuffleReadMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics"); + if (null != shuffleReadMetrics) { + task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes Read")); + task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote Bytes Read")); + task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records Read")); + } + } else { + //for tasks success without task metrics, save in the end if no other information + if (!task.isFailed()) { + return; + } + } + + aggregateToStage(task); + aggregateToExecutor(task); + tasks.remove(taskId); + this.flushEntities(task, false); + } + + + private SparkTask initializeTask(JSONObject event) { + SparkTask task = new SparkTask(); + task.setTags(new HashMap(this.app.getTags())); + task.setTimestamp(app.getTimestamp()); + + task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), JSONUtil.getLong(event, "Stage ID").toString()); + task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), JSONUtil.getLong(event, "Stage Attempt ID").toString()); + + JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info"); + int taskId = JSONUtil.getInt(taskInfo, "Task ID"); + task.setTaskId(taskId); + + task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), JSONUtil.getInt(taskInfo, "Index").toString()); + task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), JSONUtil.getInt(taskInfo, "Attempt").toString()); + long launchTime = JSONUtil.getLong(taskInfo, "Launch Time"); + if (taskId == 0) { + this.setFirstTaskLaunchTime(launchTime); + } + task.setLaunchTime(launchTime); + task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID")); + task.setHost(JSONUtil.getString(taskInfo, "Host")); + task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality")); + task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative")); + + tasks.put(task.getTaskId(), task); + return task; + } + + private void setFirstTaskLaunchTime(long launchTime) { + this.firstTaskLaunchTime = launchTime; + } + + private long getFirstTaskLaunchTime() { + return this.firstTaskLaunchTime; + } + + + private void handleJobStart(JSONObject event) { + SparkJob job = new SparkJob(); + job.setTags(new HashMap(this.app.getTags())); + job.setTimestamp(app.getTimestamp()); + + Integer jobId = JSONUtil.getInt(event, "Job ID"); + job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString()); + job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time")); + + //for complete application, no active stages/tasks + job.setNumActiveStages(0); + job.setNumActiveTasks(0); + + this.jobs.put(jobId, job); + this.jobStageMap.put(jobId, new HashSet<String>()); + + JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos"); + job.setNumStages(stages.size()); + for (int i = 0; i < stages.size(); i++) { + JSONObject stageInfo = (JSONObject) stages.get(i); + Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); + Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); + String stageName = JSONUtil.getString(stageInfo, "Stage Name"); + int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks"); + this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks); + } + } + + private void handleStageSubmit(JSONObject event) { + JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info"); + Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); + Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); + String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); + stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>()); + + if (!stages.containsKey(this.generateStageKey(stageId.toString(), stageAttemptId.toString()))) { + //may be further attempt for one stage + String baseAttempt = this.generateStageKey(stageId.toString(), "0"); + if (stages.containsKey(baseAttempt)) { + SparkStage stage = stages.get(baseAttempt); + String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()); + + String stageName = JSONUtil.getString(event, "Stage Name"); + int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks"); + this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks); + } + } + + } + + private void handleStageComplete(JSONObject event) { + JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info"); + Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); + Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); + String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); + SparkStage stage = stages.get(key); + + // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0. + Long submissionTime = JSONUtil.getLong(stageInfo, "Submission Time"); + if (submissionTime == null) { + submissionTime = this.getFirstTaskLaunchTime(); + } + stage.setSubmitTime(submissionTime); + stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time")); + + if (stageInfo.containsKey("Failure Reason")) { + stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString()); + } else { + stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()); + } + } + + private void handleExecutorRemoved(JSONObject event) { + String executorID = JSONUtil.getString(event, "Executor ID"); + SparkExecutor executor = executors.get(executorID); + executor.setEndTime(JSONUtil.getLong(event, "Timestamp")); + + } + + private void handleJobEnd(JSONObject event) { + Integer jobId = JSONUtil.getInt(event, "Job ID"); + SparkJob job = jobs.get(jobId); + job.setCompletionTime(JSONUtil.getLong(event, "Completion Time")); + JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result"); + String result = JSONUtil.getString(jobResult, "Result"); + if (result.equalsIgnoreCase("JobSucceeded")) { + job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString()); + } else { + job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString()); + } + } + + private void handleAppEnd(JSONObject event) { + long endTime = JSONUtil.getLong(event, "Timestamp"); + app.setEndTime(endTime); + } + + public void clearReader() throws Exception { + //clear tasks + for (SparkTask task : tasks.values()) { + LOG.info("Task {} does not have result or no task metrics.", task.getTaskId()); + task.setFailed(true); + aggregateToStage(task); + aggregateToExecutor(task); + this.flushEntities(task, false); + } + + List<SparkStage> needStoreStages = new ArrayList<>(); + for (SparkStage stage : this.stages.values()) { + Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); + if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) { + SparkJob job = this.jobs.get(jobId); + job.setNumSkippedStages(job.getNumSkippedStages() + 1); + job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks()); + } else { + this.aggregateToJob(stage); + this.aggregateStageToApp(stage); + needStoreStages.add(stage); + } + String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); + String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); + this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId)); + } + + this.flushEntities(needStoreStages, false); + for (SparkJob job : jobs.values()) { + this.aggregateJobToApp(job); + } + this.flushEntities(jobs.values(), false); + + app.setExecutors(executors.values().size()); + long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName())); + long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); + int executoreCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); + int driverCore = this.isClientMode(app.getConfig()) ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); + long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead"); + long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); + + app.setExecMemoryBytes(executorMemory); + app.setDriveMemoryBytes(driverMemory); + app.setExecutorCores(executoreCore); + app.setDriverCores(driverCore); + app.setExecutorMemoryOverhead(executorMemoryOverhead); + app.setDriverMemoryOverhead(driverMemoryOverhead); + + for (SparkExecutor executor : executors.values()) { + String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString()); + if (executorID.equalsIgnoreCase("driver")) { + executor.setExecMemoryBytes(driverMemory); + executor.setCores(driverCore); + executor.setMemoryOverhead(driverMemoryOverhead); + } else { + executor.setExecMemoryBytes(executorMemory); + executor.setCores(executoreCore); + executor.setMemoryOverhead(executorMemoryOverhead); + } + if (executor.getEndTime() == 0) + executor.setEndTime(app.getEndTime()); + this.aggregateExecutorToApp(executor); + } + this.flushEntities(executors.values(), false); + //spark code...tricky + app.setSkippedTasks(app.getCompleteTasks()); + this.flushEntities(app, true); + } + + private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) { + long result = 0l; + if (config.getConfig().containsKey(fieldName)) { + result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m"); + if(result == 0l){ + result = this.parseExecutorMemory(config.getConfig().get(fieldName)); + } + } + + if(result == 0l){ + result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); + } + return result; + } + + private void aggregateExecutorToApp(SparkExecutor executor) { + app.setTotalExecutorTime(app.getTotalExecutorTime() + (executor.getEndTime() - executor.getStartTime())); + } + + private void aggregateJobToApp(SparkJob job) { + //aggregate job level metrics + app.setNumJobs(app.getNumJobs() + 1); + app.setTotalTasks(app.getTotalTasks() + job.getNumTask()); + app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks()); + app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks()); + app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks()); + app.setTotalStages(app.getTotalStages() + job.getNumStages()); + app.setFailedStages(app.getFailedStages() + job.getNumFailedStages()); + app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages()); + } + + private void aggregateStageToApp(SparkStage stage) { + //aggregate task level metrics + app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled()); + app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled()); + app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime()); + app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime()); + app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime()); + app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime()); + app.setResultSize(app.getResultSize() + stage.getResultSize()); + app.setInputRecords(app.getInputRecords() + stage.getInputRecords()); + app.setInputBytes(app.getInputBytes() + stage.getInputBytes()); + app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords()); + app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes()); + app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords()); + app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes()); + app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords()); + app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes()); + } + + private void aggregateToStage(SparkTask task) { + String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); + String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); + String key = this.generateStageKey(stageId, stageAttemptId); + SparkStage stage = stages.get(key); + + stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled()); + stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled()); + stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime()); + stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime()); + stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime()); + stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime()); + stage.setResultSize(stage.getResultSize() + task.getResultSize()); + stage.setInputRecords(stage.getInputRecords() + task.getInputRecords()); + stage.setInputBytes(stage.getInputBytes() + task.getInputBytes()); + stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords()); + stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes()); + stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords()); + stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes()); + stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords()); + long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); + stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes); + + boolean success = !task.isFailed(); + + Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString())); + if (stageTaskStatusMap.get(key).containsKey(taskIndex)) { + //has previous task attempt, retrieved from task index in one stage + boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex); + success = previousResult || success; + if (previousResult != success) { + stage.setNumFailedTasks(stage.getNumFailedTasks() - 1); + stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); + stageTaskStatusMap.get(key).put(taskIndex, success); + } + } else { + if (success) { + stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); + } else { + stage.setNumFailedTasks(stage.getNumFailedTasks() + 1); + } + stageTaskStatusMap.get(key).put(taskIndex, success); + } + + } + + private void aggregateToExecutor(SparkTask task) { + String executorId = task.getExecutorId(); + SparkExecutor executor = executors.get(executorId); + + if (null != executor) { + executor.setTotalTasks(executor.getTotalTasks() + 1); + if (task.isFailed()) { + executor.setFailedTasks(executor.getFailedTasks() + 1); + } else { + executor.setCompletedTasks(executor.getCompletedTasks() + 1); + } + long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); + executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes); + executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); + executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes()); + executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes()); + executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); + } + + } + + private void aggregateToJob(SparkStage stage) { + Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); + SparkJob job = jobs.get(jobId); + job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks()); + job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks()); + job.setNumTask(job.getNumTask() + stage.getNumTasks()); + + + if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { + //if multiple attempts succeed, just count one + if (!hasStagePriorAttemptSuccess(stage)) { + job.setNumCompletedStages(job.getNumCompletedStages() + 1); + } + + } else { + job.setNumFailedStages(job.getNumFailedStages() + 1); + } + } + + private boolean hasStagePriorAttemptSuccess(SparkStage stage) { + Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString())); + for (Integer i = 0; i < stageAttemptId; i++) { + SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString())); + if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { + return true; + } + } + return false; + } + + + private String generateStageKey(String stageId, String stageAttemptId) { + return String.format("%s-%s", stageId, stageAttemptId); + } + + private void initiateStage(Integer jobId, Integer stageId, Integer stageAttemptId, String name, int numTasks) { + SparkStage stage = new SparkStage(); + stage.setTags(new HashMap(this.app.getTags())); + stage.setTimestamp(app.getTimestamp()); + stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString()); + stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stageId.toString()); + stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stageAttemptId.toString()); + stage.setName(name); + stage.setNumActiveTasks(0); + stage.setNumTasks(numTasks); + stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ? "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool")); + + String stageKey = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); + stages.put(stageKey, stage); + this.jobStageMap.get(jobId).add(stageKey); + } + + + private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception { + if (!executors.containsKey(executorID)) { + SparkExecutor executor = new SparkExecutor(); + executor.setTags(new HashMap(this.app.getTags())); + executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID); + executor.setStartTime(startTime); + executor.setTimestamp(app.getTimestamp()); + + this.executors.put(executorID, executor); + } + + return this.executors.get(executorID); + } + + private String getNormalizedName(String jobName, String assignedName) { + if (null != assignedName) { + return assignedName; + } else { + return JobNameNormalization.getInstance().normalize(jobName); + } + } + + private long parseExecutorMemory(String memory) { + + if (memory.endsWith("g") || memory.endsWith("G")) { + int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * executorGB; + } else if (memory.endsWith("m") || memory.endsWith("M")) { + int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * executorMB; + } else if (memory.endsWith("k") || memory.endsWith("K")) { + int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * executorKB; + } else if (memory.endsWith("t") || memory.endsWith("T")) { + int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * 1024 * executorTB; + } else if (memory.endsWith("p") || memory.endsWith("P")) { + int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB; + } + Log.info("Cannot parse memory info " + memory); + return 0l; + } + + private void flushEntities(Object entity, boolean forceFlush) { + this.flushEntities(Arrays.asList(entity), forceFlush); + } + + private void flushEntities(Collection entities, boolean forceFlush) { + this.createEntities.addAll(entities); + + if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) { + try { + this.doFlush(this.createEntities); + this.createEntities.clear(); + } catch (Exception e) { + LOG.error("Fail to flush entities", e); + } + + } + } + + private EagleServiceBaseClient initiateClient() { + String host = conf.getString("eagleProps.eagle.service.host"); + int port = conf.getInt("eagleProps.eagle.service.port"); + String userName = conf.getString("eagleProps.eagle.service.userName"); + String pwd = conf.getString("eagleProps.eagle.service.pwd"); + client = new EagleServiceClientImpl(host, port, userName, pwd); + int timeout = conf.getInt("eagleProps.eagle.service.read_timeout"); + client.getJerseyClient().setReadTimeout(timeout * 1000); + + return client; + } + + private void doFlush(List entities) throws Exception { + LOG.info("start flushing entities of total number " + entities.size()); +// client.create(entities); + LOG.info("finish flushing entities of total number " + entities.size()); +// for(Object entity: entities){ +// if(entity instanceof SparkApp){ +// for (Field field : entity.getClass().getDeclaredFields()) { +// field.setAccessible(true); // You might want to set modifier to public first. +// Object value = field.get(entity); +// if (value != null) { +// System.out.println(field.getName() + "=" + value); +// } +// } +// } +// } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java new file mode 100644 index 0000000..171cb0f --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.crawl; + + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class JHFSparkParser implements JHFParserBase { + + private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class); + + JHFSparkEventReader eventReader; + + public JHFSparkParser(JHFSparkEventReader reader){ + this.eventReader = reader; + } + + @Override + public void parse(InputStream is) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + try { + String line = null; + + JSONParser parser = new JSONParser(); + while((line = reader.readLine()) != null){ + try{ + JSONObject eventObj = (JSONObject) parser.parse(line); + String eventType = (String) eventObj.get("Event"); + logger.info("Event type: " + eventType); + this.eventReader.read(eventObj); + }catch(Exception e){ + logger.error(String.format("Invalid json string. Fail to parse %s.", line), e); + } + } + this.eventReader.clearReader(); + } finally { + if(reader != null){ + reader.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java new file mode 100644 index 0000000..423d045 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java @@ -0,0 +1,69 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.crawl; + +public class SparkApplicationInfo { + + private String state; + private String finalStatus; + private String queue; + private String name; + private String user; + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public String getFinalStatus() { + return finalStatus; + } + + public void setFinalStatus(String finalStatus) { + this.finalStatus = finalStatus; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java new file mode 100644 index 0000000..f1d2cd1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.crawl; + +import org.apache.eagle.jpm.util.SparkJobTagName; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader { + + private String site; + private SparkApplicationInfo app; + + + public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app){ + this.site = site; + this.app = app; + } + + @Override + public void read(InputStream is) throws Exception { + Map<String, String> baseTags = new HashMap<>(); + baseTags.put(SparkJobTagName.SITE.toString(), site); + baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue()); + JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app)); + parser.parse(is); + } + + public static void main(String[] args) throws Exception{ + SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo()); + impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1"))); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java index 7c0530d..ffa2f22 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java @@ -23,11 +23,11 @@ import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt; import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; +import org.apache.eagle.jpm.util.Constants; import java.util.List; import java.util.regex.Pattern; @@ -37,14 +37,15 @@ public class MRHistoryJobMain { try { //1. trigger init conf JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args); + com.typesafe.config.Config jhfAppConf = jhfConfigManager.getConfig(); //2. init JobHistoryContentFilter JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile(); - List<String> confKeyPatterns = jhfConfigManager.getConfig().getStringList("MRConfigureKeys"); - confKeyPatterns.add(JPAConstants.JobConfiguration.CASCADING_JOB); - confKeyPatterns.add(JPAConstants.JobConfiguration.HIVE_JOB); - confKeyPatterns.add(JPAConstants.JobConfiguration.PIG_JOB); - confKeyPatterns.add(JPAConstants.JobConfiguration.SCOOBI_JOB); + List<String> confKeyPatterns = jhfAppConf.getStringList("MRConfigureKeys"); + confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB); + confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB); + confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB); + confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB); for (String key : confKeyPatterns) { builder.includeJobKeyPatterns(Pattern.compile(key)); @@ -54,10 +55,13 @@ public class MRHistoryJobMain { //3. init topology TopologyBuilder topologyBuilder = new TopologyBuilder(); String topologyName = "mrHistoryJobTopology"; + if (jhfAppConf.hasPath("envContextConfig.topologyName")) { + topologyName = jhfAppConf.getString("envContextConfig.topologyName"); + } String spoutName = "mrHistoryJobExecutor"; String boltName = "updateProcessTime"; - int parallelism = jhfConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName); - int tasks = jhfConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName); + int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName); + int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName); if (parallelism > tasks) { parallelism = tasks; } @@ -68,8 +72,8 @@ public class MRHistoryJobMain { ).setNumTasks(tasks); topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName); - backtype.storm.Config config = new backtype.storm.Config(); - config.setNumWorkers(jhfConfigManager.getConfig().getInt("envContextConfig.workers")); + Config config = new backtype.storm.Config(); + config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers")); config.put(Config.TOPOLOGY_DEBUG, true); if (!jhfConfigManager.getEnv().equals("local")) { //cluster mode http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java deleted file mode 100755 index feb5498..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.common; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JPAConstants { - - private final static Logger LOG = LoggerFactory.getLogger(JPAConstants.class); - - public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService"; - public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService"; - public static final String JPA_JOB_EXECUTION_SERVICE_NAME = "JobExecutionService"; - - public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService"; - public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService"; - public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService"; - public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService"; - public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService"; - - public static final String JOB_TASK_TYPE_TAG = "taskType"; - - public static class JobConfiguration { - // job type - public static final String SCOOBI_JOB = "scoobi.mode"; - public static final String HIVE_JOB = "hive.query.string"; - public static final String PIG_JOB = "pig.script"; - public static final String CASCADING_JOB = "cascading.app.name"; - } - - /** - * MR task types - */ - public enum TaskType { - SETUP, MAP, REDUCE, CLEANUP - } - - public enum JobType { - CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"), - NOTAVALIABLE("N/A") - ; - private String value; - JobType(String value){ - this.value = value; - } - @Override - public String toString() { - return this.value; - } - } - - public static final String FILE_SYSTEM_COUNTER = "org.apache.hadoop.mapreduce.FileSystemCounter"; - public static final String TASK_COUNTER = "org.apache.hadoop.mapreduce.TaskCounter"; - - public static final String MAP_TASK_ATTEMPT_COUNTER = "MapTaskAttemptCounter"; - public static final String REDUCE_TASK_ATTEMPT_COUNTER = "ReduceTaskAttemptCounter"; - - public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "MapTaskAttemptFileSystemCounter"; - public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "ReduceTaskAttemptFileSystemCounter"; - - public enum TaskAttemptCounter { - TASK_ATTEMPT_DURATION, - } - - - - private static final String DEFAULT_JOB_CONF_NORM_JOBNAME_KEY = "eagle.job.name"; - private static final String EAGLE_NORM_JOBNAME_CONF_KEY = "eagle.job.normalizedfieldname"; - - public static String JOB_CONF_NORM_JOBNAME_KEY = null; - - static { - if (JOB_CONF_NORM_JOBNAME_KEY == null) { - JOB_CONF_NORM_JOBNAME_KEY = DEFAULT_JOB_CONF_NORM_JOBNAME_KEY; - } - LOG.info("Loaded " + EAGLE_NORM_JOBNAME_CONF_KEY + " : " + JOB_CONF_NORM_JOBNAME_KEY); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java deleted file mode 100644 index f85f7bc..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.common; - -import java.util.Map; -import java.util.TreeMap; - -public final class JobConfig { - private Map<String, String> config = new TreeMap<>(); - - public Map<String, String> getConfig() { - return config; - } - - public void setConfig(Map<String, String> config) { - this.config = config; - } - - public String toString(){ - return config.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java index d49cdef..964d68a 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java @@ -18,8 +18,8 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JobConfig; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.jobcounter.JobCountersSerDeser; import org.apache.eagle.log.entity.repo.EntityRepository; public class JPAEntityRepository extends EntityRepository { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java new file mode 100644 index 0000000..f1dc375 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import java.util.Map; +import java.util.TreeMap; + +public final class JobConfig { + private Map<String, String> config = new TreeMap<>(); + + public Map<String, String> getConfig() { + return config; + } + + public void setConfig(Map<String, String> config) { + this.config = config; + } + + public String toString(){ + return config.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java index 65f535f..8776f1f 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JobConfig; import org.apache.eagle.log.entity.meta.EntitySerDeser; import org.apache.hadoop.hbase.util.Bytes; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java index 44fa98c..295cc68 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java @@ -18,8 +18,7 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; -import org.apache.eagle.jpm.mr.history.common.JobConfig; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -27,12 +26,12 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa") @ColumnFamily("f") @Prefix("jconf") -@Service(JPAConstants.JPA_JOB_CONFIG_SERVICE_NAME) +@Service(Constants.JPA_JOB_CONFIG_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true), - @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false) + @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false) }) public class JobConfigurationAPIEntity extends JobBaseAPIEntity { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java deleted file mode 100755 index 01044bb..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.entities; - -import org.apache.eagle.jpm.mr.history.jobcounter.*; -import org.apache.eagle.log.entity.meta.EntitySerDeser; -import org.apache.hadoop.hbase.util.Bytes; - -import java.util.Arrays; -import java.util.Map; -import java.util.TreeMap; - -public class JobCountersSerDeser implements EntitySerDeser<JobCounters> { - - private CounterGroupDictionary dictionary = null; - - @Override - public JobCounters deserialize(byte[] bytes) { - JobCounters counters = new JobCounters(); - final int length = bytes.length; - if (length < 4) { - return counters; - } - - final Map<String, Map<String, Long> > groupMap = counters.getCounters(); - int pos = 0; - final int totalGroups = Bytes.toInt(bytes, pos); - pos += 4; - - for (int i = 0; i < totalGroups; ++i) { - final int groupIndex = Bytes.toInt(bytes, pos); - pos += 4; - final int totalCounters = Bytes.toInt(bytes, pos); - pos += 4; - final int nextGroupPos = pos + (totalCounters * 12); - try { - final CounterGroupKey groupKey = getCounterGroup(groupIndex); - if (groupKey == null) { - throw new JobCounterException("Group index " + groupIndex + " is not defined"); - } - final Map<String, Long> counterMap = new TreeMap<String, Long>(); - groupMap.put(groupKey.getName(), counterMap); - for (int j = 0; j < totalCounters; ++j) { - final int counterIndex = Bytes.toInt(bytes, pos); - pos += 4; - final long value = Bytes.toLong(bytes, pos); - pos += 8; - final CounterKey counterKey = groupKey.getCounterKeyByID(counterIndex); - if (counterKey == null) { - continue; - } - counterMap.put(counterKey.getNames().get(0), value); - } - } catch (JobCounterException ex) { - // skip the group - pos = nextGroupPos; - } - } - return counters; - } - - @Override - public byte[] serialize(JobCounters counters) { - - final Map<String, Map<String, Long>> groupMap = counters.getCounters(); - int totalSize = 4; - for (Map<String, Long> counterMap : groupMap.values()) { - final int counterCount = counterMap.size(); - totalSize += counterCount * 12 + 8; - } - byte[] buffer = new byte[totalSize]; - - int totalGroups = 0; - int pos = 0; - int totalGroupNumberPos = pos; - pos += 4; - int nextGroupPos = pos; - - for (Map.Entry<String, Map<String, Long>> entry : groupMap.entrySet()) { - final String groupName = entry.getKey(); - final Map<String, Long> counterMap = entry.getValue(); - try { - nextGroupPos = pos = serializeGroup(buffer, pos, groupName, counterMap); - ++totalGroups; - } catch (JobCounterException ex) { - pos = nextGroupPos; - } - } - - Bytes.putInt(buffer, totalGroupNumberPos, totalGroups); - if (pos < totalSize) { - buffer = Arrays.copyOf(buffer, pos); - } - return buffer; - } - - @Override - public Class<JobCounters> type() { - return JobCounters.class; - } - - private int serializeGroup(byte[] buffer, int currentPos, String groupName, Map<String, Long> counterMap) throws JobCounterException { - int pos = currentPos; - final CounterGroupKey groupKey = getCounterGroup(groupName); - if (groupKey == null) { - throw new JobCounterException("Group name " + groupName + " is not defined"); - } - Bytes.putInt(buffer, pos, groupKey.getIndex()); - pos += 4; - int totalCounterNumberPos = pos; - pos += 4; - int totalCounters = 0; - - for (Map.Entry<String, Long> entry : counterMap.entrySet()) { - final String counterName = entry.getKey(); - final CounterKey counterKey = groupKey.getCounterKeyByName(counterName); - if (counterKey == null) { - continue; - } - final Long counterValue = entry.getValue(); - Bytes.putInt(buffer, pos, counterKey.getIndex()); - pos += 4; - Bytes.putLong(buffer, pos, counterValue); - pos += 8; - ++totalCounters; - } - Bytes.putInt(buffer, totalCounterNumberPos, totalCounters); - return pos; - } - - private CounterGroupKey getCounterGroup(String groupName) throws JobCounterException { - if (dictionary == null) { - dictionary = CounterGroupDictionary.getInstance(); - } - final CounterGroupKey groupKey = dictionary.getCounterGroupByName(groupName); - if (groupKey == null) { - throw new JobCounterException("Invalid counter group name: " + groupName); - } - return groupKey; - } - - private CounterGroupKey getCounterGroup(int groupIndex) throws JobCounterException { - if (dictionary == null) { - dictionary = CounterGroupDictionary.getInstance(); - } - return dictionary.getCounterGroupByIndex(groupIndex); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java index 3639ad0..31dd480 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa") @ColumnFamily("f") @Prefix("jevent") -@Service(JPAConstants.JPA_JOB_EVENT_SERVICE_NAME) +@Service(Constants.JPA_JOB_EVENT_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) public class JobEventAPIEntity extends JobBaseAPIEntity {