http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml new file mode 100644 index 0000000..4f50350 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml @@ -0,0 +1,122 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-parent</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>eagle-jpm-spark-history</artifactId> + <name>eagle-jpm-spark-history</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-job-common</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.wso2.orbit.com.lmax</groupId> + <artifactId>disruptor</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-entity</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + <version>2.12</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/assembly/eagle-jpm-spark-history-assembly.xml</descriptor> + <finalName>eagle-jpm-spark-history-${project.version}</finalName> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <tarLongFileMode>posix</tarLongFileMode> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml b/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml new file mode 100644 index 0000000..66133a0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>assembly</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>false</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + <!--includes> + <include>org.apache.hadoop:hadoop-common</include> + <include>org.apache.hadoop:hadoop-hdfs</include> + <include>org.apache.hadoop:hadoop-client</include> + <include>org.apache.hadoop:hadoop-auth</include> + <include>org.apache.eagle:eagle-stream-process-api</include> + <include>org.apache.eagle:eagle-stream-process-base</include> + <include>org.jsoup:jsoup</include> + </includes--> + <excludes> + <exclude>org.wso2.orbit.com.lmax:disruptor</exclude> + <exclude>asm:asm</exclude> + <exclude>org.apache.storm:storm-core</exclude> + </excludes> + </dependencySet> + </dependencySets> + <fileSets> + <fileSet> + <directory>${project.build.outputDirectory}/</directory> + <outputDirectory>/</outputDirectory> + <!--<includes>--> + <!--<include>*.conf</include>--> + <!--<include>*.xml</include>--> + <!--<include>*.properties</include>--> + <!--<include>*.config</include>--> + <!--<include>classes/META-INF/*</include>--> + <!--</includes>--> + + <excludes> + <exclude>*.yaml</exclude> + </excludes> + </fileSet> + </fileSets> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java new file mode 100644 index 0000000..4282a64 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java @@ -0,0 +1,122 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.config; + + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.io.Serializable; + +public class SparkHistoryCrawlConfig implements Serializable{ + public ZKStateConfig zkStateConfig; + public JobHistoryEndpointConfig jobHistoryConfig; + public HDFSConfig hdfsConfig; + public BasicInfo info; + public EagleInfo eagleInfo; + public StormConfig stormConfig; + + private Config config; + public Config getConfig() { + return config; + } + + public SparkHistoryCrawlConfig() { + this.config = ConfigFactory.load(); + + this.zkStateConfig = new ZKStateConfig(); + this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); + this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); + this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); + this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); + this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot"); + + this.jobHistoryConfig = new JobHistoryEndpointConfig(); + jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url"); + jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username"); + jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.pwd"); + jobHistoryConfig.rms = config.getStringList("dataSourceConfig.rm.url").toArray(new String[0]); + + this.hdfsConfig = new HDFSConfig(); + this.hdfsConfig.baseDir = config.getString("dataSourceConfig.hdfs.baseDir"); + this.hdfsConfig.endpoint = config.getString("dataSourceConfig.hdfs.endPoint"); + this.hdfsConfig.principal = config.getString("dataSourceConfig.hdfs.principal"); + this.hdfsConfig.keytab = config.getString("dataSourceConfig.hdfs.keytab"); + + this.info = new BasicInfo(); + info.site = String.format("%s-%s",config.getString("basic.cluster"),config.getString("basic.datacenter")); + info.jobConf = config.getStringList("basic.jobConf.additional.info").toArray(new String[0]); + + this.eagleInfo = new EagleInfo(); + this.eagleInfo.host = config.getString("eagleProps.eagle.service.host"); + this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port"); + + this.stormConfig = new StormConfig(); + this.stormConfig.mode = config.getString("storm.mode"); + this.stormConfig.topologyName = config.getString("storm.name"); + this.stormConfig.workerNo = config.getInt("storm.workerNo"); + this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec"); + this.stormConfig.spoutPending = config.getInt("storm.pendingSpout"); + this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval"); + } + + public static class ZKStateConfig implements Serializable { + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; + } + + public static class JobHistoryEndpointConfig implements Serializable { + public String[] rms; + public String historyServerUrl; + public String historyServerUserName; + public String historyServerUserPwd; + } + + public static class HDFSConfig implements Serializable { + public String endpoint; + public String baseDir; + public String principal; + public String keytab; + } + + public static class BasicInfo implements Serializable { + public String site; + public String[] jobConf; + } + + public static class StormConfig implements Serializable { + public String mode; + public int workerNo; + public int timeoutSec; + public String topologyName; + public int spoutPending; + public int spoutCrawlInterval; + } + + public static class EagleInfo implements Serializable { + public String host; + public int port; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java new file mode 100644 index 0000000..9c720c8 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +public enum EventType { + SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart, + SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved, + SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java new file mode 100644 index 0000000..1f76a2f --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import java.io.InputStream; + +public interface JHFInputStreamReader { + public void read(InputStream is) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java new file mode 100644 index 0000000..3fbc769 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import java.io.InputStream; + +public interface JHFParserBase { + /** + * this method will ensure to close the inputstream + * @param is + * @throws Exception + */ + public void parse(InputStream is) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java new file mode 100644 index 0000000..db5d432 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import jline.internal.Log; +import org.apache.eagle.jpm.entity.*; +import org.apache.eagle.jpm.util.JSONUtil; +import org.apache.eagle.jpm.util.JobNameNormalization; +import org.apache.eagle.jpm.util.SparkEntityConstant; +import org.apache.eagle.jpm.util.SparkJobTagName; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.service.client.impl.EagleServiceBaseClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class JHFSparkEventReader { + + public static final int FLUSH_LIMIT = 500; + private static final Logger logger = LoggerFactory.getLogger(JHFSparkEventReader.class); + + private Map<String, SparkExecutor> executors; + private SparkApp app; + private Map<Integer, SparkJob> jobs; + private Map<String, SparkStage> stages; + private Map<Integer, Set<String>> jobStageMap; + private Map<Integer, SparkTask> tasks; + private EagleServiceClientImpl client; + private Map<String, Map<Integer, Boolean>> stageTaskStatusMap; + + private List<TaggedLogAPIEntity> createEntities; + + private Config conf; + + public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) { + app = new SparkApp(); + app.setTags(new HashMap<String, String>(baseTags)); + app.setYarnState(info.getState()); + app.setYarnStatus(info.getFinalStatus()); + createEntities = new ArrayList<>(); + jobs = new HashMap<Integer, SparkJob>(); + stages = new HashMap<String, SparkStage>(); + jobStageMap = new HashMap<Integer, Set<String>>(); + tasks = new HashMap<Integer, SparkTask>(); + executors = new HashMap<String, SparkExecutor>(); + stageTaskStatusMap = new HashMap<>(); + conf = ConfigFactory.load(); + this.initiateClient(); + } + + public void read(JSONObject eventObj) throws Exception { + String eventType = (String) eventObj.get("Event"); + if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) { + handleAppStarted(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) { + handleEnvironmentSet(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) { + handleExecutorAdd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) { + handleBlockManagerAdd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) { + handleJobStart(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) { + handleStageSubmit(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) { + handleTaskStart(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) { + handleTaskEnd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) { + handleStageComplete(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) { + handleJobEnd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) { + handleExecutorRemoved(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) { + handleAppEnd(eventObj); + } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) { + //nothing to do now + } else { + logger.info("Not registered event type:" + eventType); + } + + } + + + private void handleEnvironmentSet(JSONObject event) { + app.setConfig(new JobConfig()); + JSONObject sparkProps = (JSONObject) event.get("Spark Properties"); + + List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info"); + String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port", + "spark.driver.memory", "ebay.job.name", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", + "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; + jobConfs.addAll(Arrays.asList(props)); + for (String prop : jobConfs) { + if (sparkProps.containsKey(prop)) { + app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop)); + } + } + } + + private Object getConfigVal(JobConfig config, String configName, String type) { + if (config.getConfig().containsKey(configName)) { + Object val = config.getConfig().get(configName); + if (type.equalsIgnoreCase(Integer.class.getName())) { + return Integer.parseInt((String) val); + } else { + return val; + } + } else { + if (type.equalsIgnoreCase(Integer.class.getName())) { + return conf.getInt("spark.defaultVal." + configName); + } else { + return conf.getString("spark.defaultVal." + configName); + } + + } + } + + + private boolean isClientMode(JobConfig config) { + if (config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) { + return true; + } else { + return false; + } + } + + + private void handleAppStarted(JSONObject event) { + //need update all entities tag before app start + List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); + entities.addAll(this.executors.values()); + entities.add(this.app); + + for (TaggedLogAPIEntity entity : entities) { + entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtil.getString(event, "App ID")); + entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtil.getString(event, "App Name")); + entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), JSONUtil.getString(event, "App Attempt ID")); + entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtil.getString(event, "App Name"), this.app.getConfig().getConfig().get("ebay.job.name"))); + entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtil.getString(event, "User")); + } + + this.app.setStartTime(JSONUtil.getLong(event, "Timestamp")); + this.app.setTimestamp(JSONUtil.getLong(event, "Timestamp")); + + } + + private void handleExecutorAdd(JSONObject event) throws Exception { + String executorID = (String) event.get("Executor ID"); + SparkExecutor executor = this.initiateExecutor(executorID, JSONUtil.getLong(event, "Timestamp")); + + JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor Info"); + + } + + private void handleBlockManagerAdd(JSONObject event) throws Exception { + long maxMemory = JSONUtil.getLong(event, "Maximum Memory"); + long timestamp = JSONUtil.getLong(event, "Timestamp"); + JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager ID"); + String executorID = JSONUtil.getString(blockInfo, "Executor ID"); + String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, "Host"), JSONUtil.getLong(blockInfo, "Port")); + + SparkExecutor executor = this.initiateExecutor(executorID, timestamp); + executor.setMaxMemory(maxMemory); + executor.setHostPort(hostport); + } + + private void handleTaskStart(JSONObject event) { + this.initializeTask(event); + } + + private void handleTaskEnd(JSONObject event) { + JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info"); + Integer taskId = JSONUtil.getInt(taskInfo, "Task ID"); + SparkTask task = null; + if (tasks.containsKey(taskId)) { + task = tasks.get(taskId); + } else { + return; + } + + task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed")); + JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics"); + if (null != taskMetrics) { + task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, "Executor Deserialize Time")); + task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor Run Time")); + task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time")); + task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size")); + task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, "Result Serialization Time")); + task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory Bytes Spilled")); + task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes Spilled")); + + JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, "Input Metrics"); + if (null != inputMetrics) { + task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes Read")); + task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records Read")); + } + + JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, "Output Metrics"); + if (null != outputMetrics) { + task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes Written")); + task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records Written")); + } + + JSONObject shuffleWriteMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics"); + if (null != shuffleWriteMetrics) { + task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes Written")); + task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Records Written")); + } + + JSONObject shuffleReadMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics"); + if (null != shuffleReadMetrics) { + task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes Read")); + task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote Bytes Read")); + task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records Read")); + } + } else { + //for tasks success without task metrics, save in the end if no other information + if (!task.isFailed()) { + return; + } + } + + aggregateToStage(task); + aggregateToExecutor(task); + tasks.remove(taskId); + this.flushEntities(task, false); + } + + private SparkTask initializeTask(JSONObject event) { + SparkTask task = new SparkTask(); + task.setTags(new HashMap(this.app.getTags())); + task.setTimestamp(app.getTimestamp()); + + task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), JSONUtil.getLong(event, "Stage ID").toString()); + task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), JSONUtil.getLong(event, "Stage Attempt ID").toString()); + + JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info"); + task.setTaskId(JSONUtil.getInt(taskInfo, "Task ID")); + task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), JSONUtil.getInt(taskInfo, "Index").toString()); + task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), JSONUtil.getInt(taskInfo, "Attempt").toString()); + task.setLaunchTime(JSONUtil.getLong(taskInfo, "Launch Time")); + task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID")); + task.setHost(JSONUtil.getString(taskInfo, "Host")); + task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality")); + task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative")); + + tasks.put(task.getTaskId(), task); + return task; + } + + private void handleJobStart(JSONObject event) { + SparkJob job = new SparkJob(); + job.setTags(new HashMap(this.app.getTags())); + job.setTimestamp(app.getTimestamp()); + + Integer jobId = JSONUtil.getInt(event, "Job ID"); + job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString()); + job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time")); + + //for complete application, no active stages/tasks + job.setNumActiveStages(0); + job.setNumActiveTasks(0); + + this.jobs.put(jobId, job); + this.jobStageMap.put(jobId, new HashSet<String>()); + + JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos"); + job.setNumStages(stages.size()); + for (int i = 0; i < stages.size(); i++) { + JSONObject stageInfo = (JSONObject) stages.get(i); + Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); + Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); + String stageName = JSONUtil.getString(stageInfo, "Stage Name"); + int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks"); + this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks); + + } + + } + + private void handleStageSubmit(JSONObject event) { + JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info"); + Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); + Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); + String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); + stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>()); + + if (!stages.containsKey(this.generateStageKey(stageId.toString(), stageAttemptId.toString()))) { + //may be further attempt for one stage + String baseAttempt = this.generateStageKey(stageId.toString(), "0"); + if (stages.containsKey(baseAttempt)) { + SparkStage stage = stages.get(baseAttempt); + String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()); + + String stageName = JSONUtil.getString(event, "Stage Name"); + int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks"); + this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks); + } + } + + } + + private void handleStageComplete(JSONObject event) { + JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info"); + Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID"); + Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID"); + String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); + SparkStage stage = stages.get(key); + stage.setSubmitTime(JSONUtil.getLong(stageInfo, "Submission Time")); + stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time")); + + if (stageInfo.containsKey("Failure Reason")) { + stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString()); + } else { + stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()); + } + } + + private void handleExecutorRemoved(JSONObject event) { + String executorID = JSONUtil.getString(event, "Executor ID"); + SparkExecutor executor = executors.get(executorID); + executor.setEndTime(JSONUtil.getLong(event, "Timestamp")); + + } + + private void handleJobEnd(JSONObject event) { + Integer jobId = JSONUtil.getInt(event, "Job ID"); + SparkJob job = jobs.get(jobId); + job.setCompletionTime(JSONUtil.getLong(event, "Completion Time")); + JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result"); + String result = JSONUtil.getString(jobResult, "Result"); + if (result.equalsIgnoreCase("JobSucceeded")) { + job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString()); + } else { + job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString()); + } + + } + + private void handleAppEnd(JSONObject event) { + long endTime = JSONUtil.getLong(event, "Timestamp"); + app.setEndTime(endTime); + } + + public void clearReader() throws Exception { + //clear tasks + for (SparkTask task : tasks.values()) { + logger.info("Task {} does not have result or no task metrics.", task.getTaskId()); + task.setFailed(true); + aggregateToStage(task); + aggregateToExecutor(task); + this.flushEntities(task, false); + } + + List<SparkStage> needStoreStages = new ArrayList<>(); + for (SparkStage stage : this.stages.values()) { + Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); + if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) { + SparkJob job = this.jobs.get(jobId); + job.setNumSkippedStages(job.getNumSkippedStages() + 1); + job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks()); + } else { + this.aggregateToJob(stage); + this.aggregateStageToApp(stage); + needStoreStages.add(stage); + } + String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); + String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); + this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId)); + } + + this.flushEntities(needStoreStages, false); + for (SparkJob job : jobs.values()) { + this.aggregateJobToApp(job); + } + this.flushEntities(jobs.values(), false); + + app.setExecutors(executors.values().size()); + long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName())); + long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); + int executoreCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); + int driverCore = this.isClientMode(app.getConfig()) ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); + long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead"); + long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); + + app.setExecMemoryBytes(executorMemory); + app.setDriveMemoryBytes(driverMemory); + app.setExecutorCores(executoreCore); + app.setDriverCores(driverCore); + app.setExecutorMemoryOverhead(executorMemoryOverhead); + app.setDriverMemoryOverhead(driverMemoryOverhead); + + for (SparkExecutor executor : executors.values()) { + String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString()); + if (executorID.equalsIgnoreCase("driver")) { + executor.setExecMemoryBytes(driverMemory); + executor.setCores(driverCore); + executor.setMemoryOverhead(driverMemoryOverhead); + } else { + executor.setExecMemoryBytes(executorMemory); + executor.setCores(executoreCore); + executor.setMemoryOverhead(executorMemoryOverhead); + } + if (executor.getEndTime() == 0) + executor.setEndTime(app.getEndTime()); + this.aggregateExecutorToApp(executor); + } + this.flushEntities(executors.values(), false); + //spark code...tricky + app.setSkippedTasks(app.getCompleteTasks()); + this.flushEntities(app, true); + } + + private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) { + long result = 0l; + if (config.getConfig().containsKey(fieldName)) { + result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m"); + if(result == 0l){ + result = this.parseExecutorMemory(config.getConfig().get(fieldName)); + } + } + + if(result == 0l){ + result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); + } + return result; + } + + private void aggregateExecutorToApp(SparkExecutor executor) { + app.setTotalExecutorTime(app.getTotalExecutorTime() + (executor.getEndTime() - executor.getStartTime())); + } + + private void aggregateJobToApp(SparkJob job) { + //aggregate job level metrics + app.setNumJobs(app.getNumJobs() + 1); + app.setTotalTasks(app.getTotalTasks() + job.getNumTask()); + app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks()); + app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks()); + app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks()); + app.setTotalStages(app.getTotalStages() + job.getNumStages()); + app.setFailedStages(app.getFailedStages() + job.getNumFailedStages()); + app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages()); + } + + private void aggregateStageToApp(SparkStage stage) { + //aggregate task level metrics + app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled()); + app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled()); + app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime()); + app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime()); + app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime()); + app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime()); + app.setResultSize(app.getResultSize() + stage.getResultSize()); + app.setInputRecords(app.getInputRecords() + stage.getInputRecords()); + app.setInputBytes(app.getInputBytes() + stage.getInputBytes()); + app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords()); + app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes()); + app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords()); + app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes()); + app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords()); + app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes()); + } + + private void aggregateToStage(SparkTask task) { + String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); + String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); + String key = this.generateStageKey(stageId, stageAttemptId); + SparkStage stage = stages.get(key); + + stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled()); + stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled()); + stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime()); + stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime()); + stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime()); + stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime()); + stage.setResultSize(stage.getResultSize() + task.getResultSize()); + stage.setInputRecords(stage.getInputRecords() + task.getInputRecords()); + stage.setInputBytes(stage.getInputBytes() + task.getInputBytes()); + stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords()); + stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes()); + stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords()); + stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes()); + stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords()); + long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); + stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes); + + boolean success = !task.isFailed(); + + Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString())); + if (stageTaskStatusMap.get(key).containsKey(taskIndex)) { + //has previous task attempt, retrieved from task index in one stage + boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex); + success = previousResult || success; + if (previousResult != success) { + stage.setNumFailedTasks(stage.getNumFailedTasks() - 1); + stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); + stageTaskStatusMap.get(key).put(taskIndex, success); + } + } else { + if (success) { + stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); + } else { + stage.setNumFailedTasks(stage.getNumFailedTasks() + 1); + } + stageTaskStatusMap.get(key).put(taskIndex, success); + } + + } + + private void aggregateToExecutor(SparkTask task) { + String executorId = task.getExecutorId(); + SparkExecutor executor = executors.get(executorId); + + if (null != executor) { + executor.setTotalTasks(executor.getTotalTasks() + 1); + if (task.isFailed()) { + executor.setFailedTasks(executor.getFailedTasks() + 1); + } else { + executor.setCompletedTasks(executor.getCompletedTasks() + 1); + } + long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); + executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes); + executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); + executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes()); + executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes()); + executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); + } + + } + + private void aggregateToJob(SparkStage stage) { + Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); + SparkJob job = jobs.get(jobId); + job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks()); + job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks()); + job.setNumTask(job.getNumTask() + stage.getNumTasks()); + + + if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { + //if multiple attempts succeed, just count one + if (!hasStagePriorAttemptSuccess(stage)) { + job.setNumCompletedStages(job.getNumCompletedStages() + 1); + } + + } else { + job.setNumFailedStages(job.getNumFailedStages() + 1); + } + } + + private boolean hasStagePriorAttemptSuccess(SparkStage stage) { + Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString())); + for (Integer i = 0; i < stageAttemptId; i++) { + SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString())); + if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) { + return true; + } + } + return false; + } + + + private String generateStageKey(String stageId, String stageAttemptId) { + return String.format("%s-%s", stageId, stageAttemptId); + } + + private void initiateStage(Integer jobId, Integer stageId, Integer stageAttemptId, String name, int numTasks) { + SparkStage stage = new SparkStage(); + stage.setTags(new HashMap(this.app.getTags())); + stage.setTimestamp(app.getTimestamp()); + stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString()); + stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stageId.toString()); + stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stageAttemptId.toString()); + stage.setName(name); + stage.setNumActiveTasks(0); + stage.setNumTasks(numTasks); + stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ? "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool")); + + String stageKey = this.generateStageKey(stageId.toString(), stageAttemptId.toString()); + stages.put(stageKey, stage); + this.jobStageMap.get(jobId).add(stageKey); + } + + + private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception { + if (!executors.containsKey(executorID)) { + SparkExecutor executor = new SparkExecutor(); + executor.setTags(new HashMap(this.app.getTags())); + executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID); + executor.setStartTime(startTime); + executor.setTimestamp(app.getTimestamp()); + + this.executors.put(executorID, executor); + } + + return this.executors.get(executorID); + } + + private String getNormalizedName(String jobName, String assignedName) { + if (null != assignedName) { + return assignedName; + } else { + return JobNameNormalization.getInstance().normalize(jobName); + } + } + + private long parseExecutorMemory(String memory) { + + if (memory.endsWith("g") || memory.endsWith("G")) { + int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * executorGB; + } else if (memory.endsWith("m") || memory.endsWith("M")) { + int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * executorMB; + } else if (memory.endsWith("k") || memory.endsWith("K")) { + int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * executorKB; + } else if (memory.endsWith("t") || memory.endsWith("T")) { + int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * 1024 * executorTB; + } else if (memory.endsWith("p") || memory.endsWith("P")) { + int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); + return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB; + } + Log.info("Cannot parse memory info " + memory); + return 0l; + } + + private void flushEntities(Object entity, boolean forceFlush) { + this.flushEntities(Arrays.asList(entity), forceFlush); + } + + private void flushEntities(Collection entities, boolean forceFlush) { + this.createEntities.addAll(entities); + + if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) { + try { + this.doFlush(this.createEntities); + this.createEntities.clear(); + } catch (Exception e) { + logger.error("Fail to flush entities", e); + } + + } + } + + private EagleServiceBaseClient initiateClient() { + String host = conf.getString("eagleProps.eagle.service.host"); + int port = conf.getInt("eagleProps.eagle.service.port"); + String userName = conf.getString("eagleProps.eagle.service.userName"); + String pwd = conf.getString("eagleProps.eagle.service.pwd"); + client = new EagleServiceClientImpl(host, port, userName, pwd); + int timeout = conf.getInt("eagleProps.eagle.service.read_timeout"); + client.getJerseyClient().setReadTimeout(timeout * 1000); + + return client; + } + + private void doFlush(List entities) throws Exception { + logger.info("start flushing entities of total number " + entities.size()); + client.create(entities); + logger.info("finish flushing entities of total number " + entities.size()); +// for(Object entity: entities){ +// if(entity instanceof SparkApp){ +// for (Field field : entity.getClass().getDeclaredFields()) { +// field.setAccessible(true); // You might want to set modifier to public first. +// Object value = field.get(entity); +// if (value != null) { +// System.out.println(field.getName() + "=" + value); +// } +// } +// } +// } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java new file mode 100644 index 0000000..75ce508 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class JHFSparkParser implements JHFParserBase { + + private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class); + + JHFSparkEventReader eventReader; + + public JHFSparkParser(JHFSparkEventReader reader){ + this.eventReader = reader; + } + + @Override + public void parse(InputStream is) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + try{ + String line = null; + + JSONParser parser = new JSONParser(); + while((line = reader.readLine()) != null){ + try{ + JSONObject eventObj = (JSONObject) parser.parse(line); + this.eventReader.read(eventObj); + }catch(Exception e){ + logger.error(String.format("Fail to parse %s.", line), e); + } + } + this.eventReader.clearReader(); + + }finally { + if(reader != null){ + reader.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java new file mode 100644 index 0000000..c206b71 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java @@ -0,0 +1,69 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +public class SparkApplicationInfo { + + private String state; + private String finalStatus; + private String queue; + private String name; + private String user; + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public String getFinalStatus() { + return finalStatus; + } + + public void setFinalStatus(String finalStatus) { + this.finalStatus = finalStatus; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java new file mode 100644 index 0000000..38c0a04 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import org.apache.eagle.jpm.util.SparkJobTagName; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +public class SparkHistoryFileInputStreamReaderImpl implements JHFInputStreamReader { + + private String site; + private SparkApplicationInfo app; + + + public SparkHistoryFileInputStreamReaderImpl(String site, SparkApplicationInfo app){ + this.site = site; + this.app = app; + } + + @Override + public void read(InputStream is) throws Exception { + Map<String, String> baseTags = new HashMap<>(); + baseTags.put(SparkJobTagName.SITE.toString(), site); + baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue()); + JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app)); + parser.parse(is); + } + + public static void main(String[] args) throws Exception{ + SparkHistoryFileInputStreamReaderImpl impl = new SparkHistoryFileInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo()); + impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1"))); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java new file mode 100644 index 0000000..60e126e --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java @@ -0,0 +1,262 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.status; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.curator.retry.RetryNTimes; +import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; +import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class JobHistoryZKStateManager { + public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class); + private String zkRoot; + private CuratorFramework _curator; + private static String START_TIMESTAMP = "lastAppTime"; + + private CuratorFramework newCurator(SparkHistoryCrawlConfig config) throws Exception { + return CuratorFrameworkFactory.newClient( + config.zkStateConfig.zkQuorum, + config.zkStateConfig.zkSessionTimeoutMs, + 15000, + new RetryNTimes(config.zkStateConfig.zkRetryTimes, config.zkStateConfig.zkRetryInterval) + ); + } + + public JobHistoryZKStateManager(SparkHistoryCrawlConfig config) { + this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site; + + try { + _curator = newCurator(config); + _curator.start(); +; } catch (Exception e) { + LOG.error("Fail to connect to zookeeper", e); + throw new RuntimeException(e); + } + } + + public void close() { + _curator.close(); + _curator = null; + } + + public List<String> loadApplications(int limit){ + String jobPath = zkRoot + "/jobs"; + List<String> apps = new ArrayList<>(); + InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock(); + try{ + lock.acquire(); + Iterator<String> iter = _curator.getChildren().forPath(jobPath).iterator(); + while(iter.hasNext()) { + String appId = iter.next(); + String path = jobPath + "/" + appId; + if(_curator.checkExists().forPath(path) != null){ + if(new String(_curator.getData().forPath(path)).equals(ZKStateConstant.AppStatus.INIT.toString())){ + apps.add(appId); + } + } + if(apps.size() == limit){ + break; + } + } + return apps; + }catch(Exception e){ + LOG.error("fail to read unprocessed jobs", e); + throw new RuntimeException(e); + }finally { + try{ + lock.release(); + }catch(Exception e){ + LOG.error("fail to release lock", e); + } + + } + } + + public void resetApplications() { + String jobPath = zkRoot + "/jobs"; + InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock(); + try { + lock.acquire(); + Iterator<String> iter = _curator.getChildren().forPath(jobPath).iterator(); + while (iter.hasNext()) { + String appId = iter.next(); + String path = jobPath + "/" + appId; + try { + if (_curator.checkExists().forPath(path) != null) { + String status = new String(_curator.getData().forPath(path)); + if(!ZKStateConstant.AppStatus.INIT.toString().equals(status)) + _curator.setData().forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8")); + } + } catch (Exception e) { + LOG.error("fail to read unprocessed job", e); + throw new RuntimeException(e); + } + } + + } catch (Exception e) { + LOG.error("fail to read unprocessed jobs", e); + throw new RuntimeException(e); + } finally { + try { + lock.release(); + } catch(Exception e) { + LOG.error("fail to release lock", e); + } + } + } + + public SparkApplicationInfo getApplicationInfo(String appId){ + + String appPath = zkRoot + "/jobs/" + appId +"/info"; + try{ + SparkApplicationInfo info = new SparkApplicationInfo(); + if(_curator.checkExists().forPath(appPath)!= null){ + String[] appStatus = new String(_curator.getData().forPath(appPath)).split("/"); + info.setQueue(appStatus[0]); + info.setState(appStatus[1]); + info.setFinalStatus(appStatus[2]); + if(appStatus.length > 3){ + info.setUser(appStatus[3]); + info.setName(appStatus[4]); + } + + } + return info; + }catch(Exception e){ + LOG.error("fail to read application attempt info", e); + throw new RuntimeException(e); + } + } + + public long readLastFinishedTimestamp(){ + String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP; + + try{ + if(_curator.checkExists().forPath(lastTimeStampPath) == null){ + return 0l; + }else{ + return Long.valueOf(new String(_curator.getData().forPath(lastTimeStampPath))); + } + }catch(Exception e){ + LOG.error("fail to read last finished spark job timestamp", e); + throw new RuntimeException(e); + } + } + + public boolean hasApplication(String appId){ + String path = zkRoot + "/jobs/" + appId; + try { + if (_curator.checkExists().forPath(path) != null) { + return true; + } + return false; + }catch (Exception e){ + LOG.error("fail to check whether application exists", e); + throw new RuntimeException(e); + } + } + + public void addFinishedApplication(String appId, String queue, String yarnState, String yarnStatus, String user, String name){ + String path = zkRoot + "/jobs/" + appId; + + + try{ + if(_curator.checkExists().forPath(path) != null){ + _curator.delete().deletingChildrenIfNeeded().forPath(path); + } + + name = name.replace("/","_"); + if(name.length() > 50){ + name = name.substring(0, 50); + } + + CuratorTransactionBridge result = _curator.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8")); + result = result.and().create().withMode(CreateMode.PERSISTENT).forPath(path + "/info", String.format("%s/%s/%s/%s/%s", queue, yarnState, yarnStatus, user, name).getBytes("UTF-8")); + + result.and().commit(); + }catch (Exception e){ + LOG.error("fail adding finished application", e); + throw new RuntimeException(e); + } + } + + + public void updateLastUpdateTime(Long updateTime){ + String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP; + try{ + if(_curator.checkExists().forPath(lastTimeStampPath) == null){ + _curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8")); + }else{ + long originalEndTime = this.readLastFinishedTimestamp(); + if(originalEndTime < updateTime){ + _curator.setData().forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8")); + } + } + }catch (Exception e){ + LOG.error("fail to update last finished time", e); + throw new RuntimeException(e); + } + + } + + public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status){ + + String path = zkRoot + "/jobs/" + appId ; + InterProcessLock lock = new InterProcessReadWriteLock(_curator,zkRoot+"/jobs").readLock(); + try{ + if(_curator.checkExists().forPath(path) != null){ + if(status.equals(ZKStateConstant.AppStatus.FINISHED)){ + lock.acquire(); + _curator.delete().deletingChildrenIfNeeded().forPath(path); + }else{ + _curator.setData().forPath(path, status.toString().getBytes("UTF-8")); + } + }else{ + String errorMsg = String.format("fail to update for application with path %s", path); + LOG.error(errorMsg); + } + }catch (Exception e){ + LOG.error("fail to update application status", e); + throw new RuntimeException(e); + }finally{ + try{ + if(lock.isAcquiredInThisProcess()) + lock.release(); + }catch (Exception e){ + LOG.error("fail to release lock",e); + } + + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java new file mode 100644 index 0000000..40efa50 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java @@ -0,0 +1,27 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.status; + +public class ZKStateConstant { + + public enum AppStatus{ + INIT, SENT_FOR_PARSE, FINISHED, FAILED + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java new file mode 100644 index 0000000..9c231aa --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java @@ -0,0 +1,152 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.storm; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; +import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager; +import org.apache.eagle.jpm.spark.history.status.ZKStateConstant; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; + +public class FinishedSparkJobSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(FinishedSparkJobSpout.class); + private SpoutOutputCollector collector; + private JobHistoryZKStateManager zkState; + private SparkHistoryCrawlConfig config; + private ResourceFetcher rmFetch; + private long lastFinishAppTime = 0; + private Map<String, Integer> failTimes; + + private static final int FAIL_MAX_TIMES = 5; + + public FinishedSparkJobSpout(SparkHistoryCrawlConfig config){ + this.config = config; + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms); + this.failTimes = new HashMap<>(); + this.collector = spoutOutputCollector; + this.zkState = new JobHistoryZKStateManager(config); + this.lastFinishAppTime = zkState.readLastFinishedTimestamp(); + zkState.resetApplications(); + } + + + @Override + public void nextTuple() { + LOG.info("Start to run tuple"); + try { + long fetchTime = Calendar.getInstance().getTimeInMillis(); + if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) { + List apps = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, new Long(lastFinishAppTime).toString()); + List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>()); + LOG.info("Get " + appInfos.size() + " from yarn resource manager."); + for (AppInfo app: appInfos) { + String appId = app.getId(); + if (!zkState.hasApplication(appId)) { + zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName()); + } + } + this.lastFinishAppTime = fetchTime; + zkState.updateLastUpdateTime(fetchTime); + } + + List<String> appIds = zkState.loadApplications(10); + for (String appId: appIds) { + collector.emit(new Values(appId), appId); + LOG.info("emit " + appId); + zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE); + } + LOG.info("{} apps sent.", appIds.size()); + + if (appIds.isEmpty()) { + this.takeRest(60); + } + } catch (Exception e) { + LOG.error("Fail to run next tuple", e); + // this.takeRest(10); + } + + } + + private void takeRest(int seconds) { + try { + Thread.sleep(seconds * 1000); + } catch(InterruptedException e) { + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("appId")); + } + + @Override + public void fail(Object msgId) { + String appId = (String) msgId; + int failTimes = 0; + if (this.failTimes.containsKey(appId)) { + failTimes = this.failTimes.get(appId); + } + failTimes ++; + if (failTimes >= FAIL_MAX_TIMES) { + this.failTimes.remove(appId); + zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED); + LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES)); + } else { + this.failTimes.put(appId, failTimes); + collector.emit(new Values(appId), appId); + zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE); + } + } + + @Override + public void ack(Object msgId) { + String appId = (String) msgId; + if (this.failTimes.containsKey(appId)) { + this.failTimes.remove(appId); + } + + } + + @Override + public void close() { + super.close(); + zkState.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java new file mode 100644 index 0000000..bd0eb85 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java @@ -0,0 +1,81 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.storm; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.topology.TopologyBuilder; +import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; + +public class SparkHistoryTopology { + + private SparkHistoryCrawlConfig SHConfig; + + public SparkHistoryTopology(SparkHistoryCrawlConfig config){ + this.SHConfig = config; + } + + public TopologyBuilder getBuilder() { + TopologyBuilder builder = new TopologyBuilder(); + String spoutName = "sparkHistoryJobSpout"; + String boltName = "sparkHistoryJobBolt"; + com.typesafe.config.Config config = this.SHConfig.getConfig(); + builder.setSpout(spoutName, + new FinishedSparkJobSpout(SHConfig), + config.getInt("storm.parallelismConfig." + spoutName) + ).setNumTasks(config.getInt("storm.tasks." + spoutName)); + + builder.setBolt(boltName, + new SparkJobParseBolt(SHConfig), + config.getInt("storm.parallelismConfig." + boltName) + ).setNumTasks(config.getInt("storm.tasks." + boltName)).shuffleGrouping(spoutName); + return builder; + } + + + public static void main(String[] args) { + try { + SparkHistoryCrawlConfig crawlConfig = new SparkHistoryCrawlConfig(); + + Config conf = new Config(); + conf.setNumWorkers(crawlConfig.stormConfig.workerNo); + conf.setMessageTimeoutSecs(crawlConfig.stormConfig.timeoutSec); + //conf.setMaxSpoutPending(crawlConfig.stormConfig.spoutPending); + //conf.put(Config.TOPOLOGY_DEBUG, true); + + + if (crawlConfig.stormConfig.mode.equals("local")) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology( + crawlConfig.stormConfig.topologyName, + conf, + new SparkHistoryTopology(crawlConfig).getBuilder().createTopology()); + } else { + StormSubmitter.submitTopology( + crawlConfig.stormConfig.topologyName, + conf, + new SparkHistoryTopology(crawlConfig).getBuilder().createTopology()); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +}
