http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java new file mode 100644 index 0000000..211d6b7 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.running.entities; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +import java.util.List; + +@Table("eagleSparkRunningJobs") +@ColumnFamily("f") +@Prefix("sparkJob") +@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"}) +@Partition({"site"}) +public class SparkJobEntity extends TaggedLogAPIEntity { + @Column("a") + private long submissionTime; + @Column("b") + private long completionTime; + @Column("c") + private int numStages = 0; + @Column("d") + private String status; + @Column("e") + private int numTask = 0; + @Column("f") + private int numActiveTasks = 0; + @Column("g") + private int numCompletedTasks = 0; + @Column("h") + private int numSkippedTasks = 0; + @Column("i") + private int numFailedTasks = 0; + @Column("j") + private int numActiveStages = 0; + @Column("k") + private int numCompletedStages = 0; + @Column("l") + private int numSkippedStages = 0; + @Column("m") + private int numFailedStages = 0; + @Column("n") + private List<Integer> stages; + + public List<Integer> getStages() { + return stages; + } + + public void setStages(List<Integer> stages) { + this.stages = stages; + this.valueChanged("stages"); + } + + public long getSubmissionTime() { + return submissionTime; + } + + public long getCompletionTime() { + return completionTime; + } + + public int getNumStages() { + return numStages; + } + + public String getStatus() { + return status; + } + + public int getNumTask() { + return numTask; + } + + public int getNumActiveTasks() { + return numActiveTasks; + } + + public int getNumCompletedTasks() { + return numCompletedTasks; + } + + public int getNumSkippedTasks() { + return numSkippedTasks; + } + + public int getNumFailedTasks() { + return numFailedTasks; + } + + public int getNumActiveStages() { + return numActiveStages; + } + + public int getNumCompletedStages() { + return numCompletedStages; + } + + public int getNumSkippedStages() { + return numSkippedStages; + } + + public int getNumFailedStages() { + return numFailedStages; + } + + public void setSubmissionTime(long submissionTime) { + this.submissionTime = submissionTime; + this.valueChanged("submissionTime"); + } + + public void setCompletionTime(long completionTime) { + this.completionTime = completionTime; + this.valueChanged("completionTime"); + } + + public void setNumStages(int numStages) { + this.numStages = numStages; + this.valueChanged("numStages"); + } + + public void setStatus(String status) { + this.status = status; + this.valueChanged("status"); + } + + public void setNumTask(int numTask) { + this.numTask = numTask; + this.valueChanged("numTask"); + } + + public void setNumActiveTasks(int numActiveTasks) { + this.numActiveTasks = numActiveTasks; + this.valueChanged("numActiveTasks"); + } + + public void setNumCompletedTasks(int numCompletedTasks) { + this.numCompletedTasks = numCompletedTasks; + this.valueChanged("numCompletedTasks"); + } + + public void setNumSkippedTasks(int numSkippedTasks) { + this.numSkippedTasks = numSkippedTasks; + this.valueChanged("numSkippedTasks"); + } + + public void setNumFailedTasks(int numFailedTasks) { + this.numFailedTasks = numFailedTasks; + this.valueChanged("numFailedTasks"); + } + + public void setNumActiveStages(int numActiveStages) { + this.numActiveStages = numActiveStages; + this.valueChanged("numActiveStages"); + } + + public void setNumCompletedStages(int numCompletedStages) { + this.numCompletedStages = numCompletedStages; + this.valueChanged("numCompletedStages"); + } + + public void setNumSkippedStages(int numSkippedStages) { + this.numSkippedStages = numSkippedStages; + this.valueChanged("numSkippedStages"); + } + + public void setNumFailedStages(int numFailedStages) { + this.numFailedStages = numFailedStages; + this.valueChanged("numFailedStages"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java new file mode 100644 index 0000000..0194132 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.running.entities; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +@Table("eagleSparkRunningStages") +@ColumnFamily("f") +@Prefix("sparkStage") +@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"}) +@Partition({"site"}) +public class SparkStageEntity extends TaggedLogAPIEntity { + @Column("a") + private String status; + @Column("b") + private int numActiveTasks = 0; + @Column("c") + private int numCompletedTasks = 0; + @Column("d") + private int numFailedTasks = 0; + @Column("e") + private long executorRunTime = 0L; + @Column("f") + private long inputBytes = 0L; + @Column("g") + private long inputRecords = 0L; + @Column("h") + private long outputBytes = 0L; + @Column("i") + private long outputRecords = 0L; + @Column("j") + private long shuffleReadBytes = 0L; + @Column("k") + private long shuffleReadRecords = 0L; + @Column("l") + private long shuffleWriteBytes = 0L; + @Column("m") + private long shuffleWriteRecords = 0L; + @Column("n") + private long memoryBytesSpilled = 0L; + @Column("o") + private long diskBytesSpilled = 0L; + @Column("p") + private String name; + @Column("q") + private String schedulingPool; + @Column("r") + private long submitTime; + @Column("s") + private long completeTime; + @Column("t") + private int numTasks; + @Column("u") + private long executorDeserializeTime; + @Column("v") + private long resultSize; + @Column("w") + private long jvmGcTime; + @Column("x") + private long resultSerializationTime; + + public String getStatus() { + return status; + } + + public int getNumActiveTasks() { + return numActiveTasks; + } + + public int getNumCompletedTasks() { + return numCompletedTasks; + } + + public int getNumFailedTasks() { + return numFailedTasks; + } + + public long getExecutorRunTime() { + return executorRunTime; + } + + public long getInputBytes() { + return inputBytes; + } + + public long getInputRecords() { + return inputRecords; + } + + public long getOutputBytes() { + return outputBytes; + } + + public long getOutputRecords() { + return outputRecords; + } + + public long getShuffleReadBytes() { + return shuffleReadBytes; + } + + public long getShuffleReadRecords() { + return shuffleReadRecords; + } + + public long getShuffleWriteBytes() { + return shuffleWriteBytes; + } + + public long getShuffleWriteRecords() { + return shuffleWriteRecords; + } + + public long getMemoryBytesSpilled() { + return memoryBytesSpilled; + } + + public long getDiskBytesSpilled() { + return diskBytesSpilled; + } + + public String getName() { + return name; + } + + public String getSchedulingPool() { + return schedulingPool; + } + + public long getSubmitTime() { + return submitTime; + } + + public long getCompleteTime() { + return completeTime; + } + + public int getNumTasks() { + return numTasks; + } + + public long getExecutorDeserializeTime() { + return executorDeserializeTime; + } + + public long getResultSize() { + return resultSize; + } + + public long getJvmGcTime() { + return jvmGcTime; + } + + public long getResultSerializationTime() { + return resultSerializationTime; + } + + public void setStatus(String status) { + this.status = status; + this.valueChanged("status"); + } + + public void setNumActiveTasks(int numActiveTasks) { + this.numActiveTasks = numActiveTasks; + this.valueChanged("numActiveTasks"); + } + + public void setNumCompletedTasks(int numCompletedTasks) { + this.numCompletedTasks = numCompletedTasks; + this.valueChanged("numCompletedTasks"); + } + + public void setNumFailedTasks(int numFailedTasks) { + this.numFailedTasks = numFailedTasks; + this.valueChanged("numFailedTasks"); + } + + public void setExecutorRunTime(long executorRunTime) { + this.executorRunTime = executorRunTime; + this.valueChanged("executorRunTime"); + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + this.valueChanged("inputBytes"); + } + + public void setInputRecords(long inputRecords) { + this.inputRecords = inputRecords; + this.valueChanged("inputRecords"); + } + + public void setOutputBytes(long outputBytes) { + this.outputBytes = outputBytes; + this.valueChanged("outputBytes"); + } + + public void setOutputRecords(long outputRecords) { + this.outputRecords = outputRecords; + this.valueChanged("outputRecords"); + } + + public void setShuffleReadBytes(long shuffleReadBytes) { + this.shuffleReadBytes = shuffleReadBytes; + this.valueChanged("shuffleReadBytes"); + } + + public void setShuffleReadRecords(long shuffleReadRecords) { + this.shuffleReadRecords = shuffleReadRecords; + this.valueChanged("shuffleReadRecords"); + } + + public void setShuffleWriteBytes(long shuffleWriteBytes) { + this.shuffleWriteBytes = shuffleWriteBytes; + this.valueChanged("shuffleWriteBytes"); + } + + public void setShuffleWriteRecords(long shuffleWriteRecords) { + this.shuffleWriteRecords = shuffleWriteRecords; + this.valueChanged("shuffleWriteRecords"); + } + + public void setMemoryBytesSpilled(long memoryBytesSpilled) { + this.memoryBytesSpilled = memoryBytesSpilled; + this.valueChanged("memoryBytesSpilled"); + } + + public void setDiskBytesSpilled(long diskBytesSpilled) { + this.diskBytesSpilled = diskBytesSpilled; + this.valueChanged("diskBytesSpilled"); + } + + public void setName(String name) { + this.name = name; + this.valueChanged("name"); + } + + public void setSchedulingPool(String schedulingPool) { + this.schedulingPool = schedulingPool; + this.valueChanged("schedulingPool"); + } + + public void setSubmitTime(long submitTime) { + this.submitTime = submitTime; + this.valueChanged("submitTime"); + } + + public void setCompleteTime(long completeTime) { + this.completeTime = completeTime; + this.valueChanged("completeTime"); + } + + public void setNumTasks(int numTasks) { + this.numTasks = numTasks; + valueChanged("numTasks"); + } + + public void setExecutorDeserializeTime(long executorDeserializeTime) { + this.executorDeserializeTime = executorDeserializeTime; + valueChanged("executorDeserializeTime"); + } + + public void setResultSize(long resultSize) { + this.resultSize = resultSize; + valueChanged("resultSize"); + } + + public void setJvmGcTime(long jvmGcTime) { + this.jvmGcTime = jvmGcTime; + valueChanged("jvmGcTime"); + } + + public void setResultSerializationTime(long resultSerializationTime) { + this.resultSerializationTime = resultSerializationTime; + valueChanged("resultSerializationTime"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java new file mode 100644 index 0000000..6522c3c --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.running.entities; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +@Table("eagleSparkRunningTasks") +@ColumnFamily("f") +@Prefix("sparkTask") +@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"}) +@Partition({"site"}) +public class SparkTaskEntity extends TaggedLogAPIEntity { + @Column("a") + private int taskId; + @Column("b") + private long launchTime; + @Column("c") + private String executorId; + @Column("d") + private String host; + @Column("e") + private String taskLocality; + @Column("f") + private boolean speculative; + @Column("g") + private long executorDeserializeTime; + @Column("h") + private long executorRunTime; + @Column("i") + private long resultSize; + @Column("j") + private long jvmGcTime; + @Column("k") + private long resultSerializationTime; + @Column("l") + private long memoryBytesSpilled; + @Column("m") + private long diskBytesSpilled; + @Column("n") + private long inputBytes; + @Column("o") + private long inputRecords; + @Column("p") + private long outputBytes; + @Column("q") + private long outputRecords; + @Column("r") + private long shuffleReadRemoteBytes; + @Column("x") + private long shuffleReadLocalBytes; + @Column("s") + private long shuffleReadRecords; + @Column("t") + private long shuffleWriteBytes; + @Column("u") + private long shuffleWriteRecords; + @Column("v") + private boolean failed; + + public int getTaskId() { + return taskId; + } + + public long getLaunchTime() { + return launchTime; + } + + public String getExecutorId() { + return executorId; + } + + public String getHost() { + return host; + } + + public String getTaskLocality() { + return taskLocality; + } + + public boolean isSpeculative() { + return speculative; + } + + public long getExecutorDeserializeTime() { + return executorDeserializeTime; + } + + public long getExecutorRunTime() { + return executorRunTime; + } + + public long getResultSize() { + return resultSize; + } + + public long getJvmGcTime() { + return jvmGcTime; + } + + public long getResultSerializationTime() { + return resultSerializationTime; + } + + public long getMemoryBytesSpilled() { + return memoryBytesSpilled; + } + + public long getDiskBytesSpilled() { + return diskBytesSpilled; + } + + public long getInputBytes() { + return inputBytes; + } + + public long getInputRecords() { + return inputRecords; + } + + public long getOutputBytes() { + return outputBytes; + } + + public long getOutputRecords() { + return outputRecords; + } + + public long getShuffleReadRecords() { + return shuffleReadRecords; + } + + public long getShuffleWriteBytes() { + return shuffleWriteBytes; + } + + public long getShuffleWriteRecords() { + return shuffleWriteRecords; + } + + public boolean isFailed() { + return failed; + } + + public long getShuffleReadRemoteBytes() { + return shuffleReadRemoteBytes; + } + + public long getShuffleReadLocalBytes() { + return shuffleReadLocalBytes; + } + + public void setFailed(boolean failed) { + this.failed = failed; + valueChanged("failed"); + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + valueChanged("taskId"); + } + + public void setLaunchTime(long launchTime) { + this.launchTime = launchTime; + valueChanged("launchTime"); + } + + public void setExecutorId(String executorId) { + this.executorId = executorId; + valueChanged("executorId"); + } + + public void setHost(String host) { + this.host = host; + this.valueChanged("host"); + } + + public void setTaskLocality(String taskLocality) { + this.taskLocality = taskLocality; + this.valueChanged("taskLocality"); + } + + public void setSpeculative(boolean speculative) { + this.speculative = speculative; + this.valueChanged("speculative"); + } + + public void setExecutorDeserializeTime(long executorDeserializeTime) { + this.executorDeserializeTime = executorDeserializeTime; + this.valueChanged("executorDeserializeTime"); + } + + public void setExecutorRunTime(long executorRunTime) { + this.executorRunTime = executorRunTime; + this.valueChanged("executorRunTime"); + } + + public void setResultSize(long resultSize) { + this.resultSize = resultSize; + this.valueChanged("resultSize"); + } + + public void setJvmGcTime(long jvmGcTime) { + this.jvmGcTime = jvmGcTime; + this.valueChanged("jvmGcTime"); + } + + public void setResultSerializationTime(long resultSerializationTime) { + this.resultSerializationTime = resultSerializationTime; + this.valueChanged("resultSerializationTime"); + } + + public void setMemoryBytesSpilled(long memoryBytesSpilled) { + this.memoryBytesSpilled = memoryBytesSpilled; + this.valueChanged("memoryBytesSpilled"); + } + + public void setDiskBytesSpilled(long diskBytesSpilled) { + this.diskBytesSpilled = diskBytesSpilled; + this.valueChanged("diskBytesSpilled"); + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + this.valueChanged("inputBytes"); + } + + public void setInputRecords(long inputRecords) { + this.inputRecords = inputRecords; + this.valueChanged("inputRecords"); + } + + public void setOutputBytes(long outputBytes) { + this.outputBytes = outputBytes; + this.valueChanged("outputBytes"); + } + + public void setOutputRecords(long outputRecords) { + this.outputRecords = outputRecords; + this.valueChanged("outputRecords"); + } + + + + public void setShuffleReadRecords(long shuffleReadRecords) { + this.shuffleReadRecords = shuffleReadRecords; + this.valueChanged("shuffleReadRecords"); + } + + public void setShuffleWriteBytes(long shuffleWriteBytes) { + this.shuffleWriteBytes = shuffleWriteBytes; + this.valueChanged("shuffleWriteBytes"); + } + + public void setShuffleWriteRecords(long shuffleWriteRecords) { + this.shuffleWriteRecords = shuffleWriteRecords; + this.valueChanged("shuffleWriteRecords"); + } + + public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) { + this.shuffleReadRemoteBytes = shuffleReadRemoteBytes; + this.valueChanged("shuffleReadRemoteBytes"); + } + + public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) { + this.shuffleReadLocalBytes = shuffleReadLocalBytes; + this.valueChanged("shuffleReadLocalBytes"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java index 0fc74d7..284eeee 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java @@ -82,8 +82,6 @@ public class SparkHistoryJobAppConfig implements Serializable { this.eagleInfo.host = config.getString("eagleProps.eagle.service.host"); this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port"); - this.stormConfig.topologyName = config.getString("storm.name"); - this.stormConfig.workerNo = config.getInt("storm.worker.num"); this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec"); this.stormConfig.spoutPending = config.getInt("storm.pendingSpout"); this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval"); @@ -117,9 +115,7 @@ public class SparkHistoryJobAppConfig implements Serializable { } public static class StormConfig implements Serializable { - public int workerNo; public int timeoutSec; - public String topologyName; public int spoutPending; public int spoutCrawlInterval; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java new file mode 100644 index 0000000..b73b52e --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import java.io.InputStream; + +public interface JHFInputStreamReader { + void read(InputStream is) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java new file mode 100644 index 0000000..047e2d5 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import java.io.InputStream; + +public interface JHFParserBase { + /** + * this method will ensure to close the inputStream. + * @param is + * @throws Exception + */ + void parse(InputStream is) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java new file mode 100644 index 0000000..571620a --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.commons.lang.ArrayUtils; +import org.apache.eagle.jpm.spark.entity.*; +import org.apache.eagle.jpm.util.*; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.service.client.EagleServiceClientException; +import org.apache.eagle.service.client.impl.EagleServiceBaseClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +public class JHFSparkEventReader { + private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class); + + private static final int FLUSH_LIMIT = 500; + private long firstTaskLaunchTime; + private long lastEventTime; + + private Map<String, SparkExecutor> executors; + private SparkApp app; + private Map<Integer, SparkJob> jobs; + private Map<String, SparkStage> stages; + private Map<Integer, Set<String>> jobStageMap; + private Map<Long, SparkTask> tasks; + private EagleServiceClientImpl client; + private Map<String, Map<Integer, Boolean>> stageTaskStatusMap; + + private List<TaggedLogAPIEntity> createEntities; + + private Config conf; + + public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) { + app = new SparkApp(); + app.setTags(new HashMap<String, String>(baseTags)); + app.setYarnState(info.getState()); + app.setYarnStatus(info.getFinalStatus()); + createEntities = new ArrayList<>(); + jobs = new HashMap<Integer, SparkJob>(); + stages = new HashMap<String, SparkStage>(); + jobStageMap = new HashMap<Integer, Set<String>>(); + tasks = new HashMap<Long, SparkTask>(); + executors = new HashMap<String, SparkExecutor>(); + stageTaskStatusMap = new HashMap<>(); + conf = ConfigFactory.load(); + this.initiateClient(); + } + + public SparkApp getApp() { + return this.app; + } + + public void read(JSONObject eventObj) { + String eventType = (String) eventObj.get("Event"); + if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationStart.toString())) { + handleAppStarted(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString())) { + handleEnvironmentSet(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorAdded.toString())) { + handleExecutorAdd(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerAdded.toString())) { + handleBlockManagerAdd(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobStart.toString())) { + handleJobStart(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageSubmitted.toString())) { + handleStageSubmit(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskStart.toString())) { + handleTaskStart(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskEnd.toString())) { + handleTaskEnd(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageCompleted.toString())) { + handleStageComplete(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobEnd.toString())) { + handleJobEnd(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorRemoved.toString())) { + handleExecutorRemoved(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationEnd.toString())) { + handleAppEnd(eventObj); + } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerRemoved.toString())) { + //nothing to do now + } else { + LOG.info("Not registered event type:" + eventType); + } + + } + + private void handleEnvironmentSet(JSONObject event) { + app.setConfig(new JobConfig()); + JSONObject sparkProps = (JSONObject) event.get("Spark Properties"); + + String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*"); + String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port", + "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory", + "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"}; + String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props); + for (String prop : jobConf) { + if (sparkProps.containsKey(prop)) { + app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop)); + } + } + } + + private Object getConfigVal(JobConfig config, String configName, String type) { + if (config.getConfig().containsKey(configName)) { + Object val = config.getConfig().get(configName); + if (type.equalsIgnoreCase(Integer.class.getName())) { + return Integer.parseInt((String) val); + } else { + return val; + } + } else { + if (type.equalsIgnoreCase(Integer.class.getName())) { + return conf.getInt("spark.defaultVal." + configName); + } else { + return conf.getString("spark.defaultVal." + configName); + } + } + } + + private boolean isClientMode(JobConfig config) { + return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client"); + } + + private void handleAppStarted(JSONObject event) { + //need update all entities tag before app start + List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); + entities.addAll(this.executors.values()); + entities.add(this.app); + + long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); + for (TaggedLogAPIEntity entity : entities) { + entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID")); + entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name")); + // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1. + String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID"); + entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId); + // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text + // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text + entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null)); + entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User")); + + entity.setTimestamp(appStartTime); + } + + this.app.setStartTime(appStartTime); + this.lastEventTime = appStartTime; + } + + private void handleExecutorAdd(JSONObject event) { + String executorID = (String) event.get("Executor ID"); + long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); + this.lastEventTime = executorAddTime; + SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime); + + JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info"); + + } + + private void handleBlockManagerAdd(JSONObject event) { + long maxMemory = JSONUtils.getLong(event, "Maximum Memory"); + long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime); + this.lastEventTime = timestamp; + JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID"); + String executorID = JSONUtils.getString(blockInfo, "Executor ID"); + String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port"); + + SparkExecutor executor = this.initiateExecutor(executorID, timestamp); + executor.setMaxMemory(maxMemory); + executor.setHostPort(hostAndPort); + } + + private void handleTaskStart(JSONObject event) { + this.initializeTask(event); + } + + private void handleTaskEnd(JSONObject event) { + JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info"); + long taskId = JSONUtils.getLong(taskInfo, "Task ID"); + SparkTask task = tasks.get(taskId); + if (task == null) { + return; + } + + task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed")); + JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics"); + if (null != taskMetrics) { + task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime)); + task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime)); + task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime)); + task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size")); + task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime)); + task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled")); + task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled")); + + JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics"); + if (null != inputMetrics) { + task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read")); + task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read")); + } + + JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics"); + if (null != outputMetrics) { + task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written")); + task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written")); + } + + JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics"); + if (null != shuffleWriteMetrics) { + task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written")); + task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written")); + } + + JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics"); + if (null != shuffleReadMetrics) { + task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read")); + task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read")); + task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read")); + } + } else { + //for tasks success without task metrics, save in the end if no other information + if (!task.isFailed()) { + return; + } + } + + aggregateToStage(task); + aggregateToExecutor(task); + tasks.remove(taskId); + this.flushEntities(task, false); + } + + + private SparkTask initializeTask(JSONObject event) { + SparkTask task = new SparkTask(); + task.setTags(new HashMap<>(this.app.getTags())); + task.setTimestamp(app.getTimestamp()); + + task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID"))); + task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID"))); + + JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info"); + long taskId = JSONUtils.getLong(taskInfo, "Task ID"); + task.setTaskId(taskId); + + task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index"))); + task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt"))); + long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime); + this.lastEventTime = launchTime; + if (taskId == 0) { + this.setFirstTaskLaunchTime(launchTime); + } + task.setLaunchTime(launchTime); + task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID")); + task.setHost(JSONUtils.getString(taskInfo, "Host")); + task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality")); + task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative")); + + tasks.put(task.getTaskId(), task); + return task; + } + + private void setFirstTaskLaunchTime(long launchTime) { + this.firstTaskLaunchTime = launchTime; + } + + private void handleJobStart(JSONObject event) { + SparkJob job = new SparkJob(); + job.setTags(new HashMap<>(this.app.getTags())); + job.setTimestamp(app.getTimestamp()); + + int jobId = JSONUtils.getInt(event, "Job ID"); + job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId)); + long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime); + job.setSubmissionTime(submissionTime); + this.lastEventTime = submissionTime; + + //for complete application, no active stages/tasks + job.setNumActiveStages(0); + job.setNumActiveTasks(0); + + this.jobs.put(jobId, job); + this.jobStageMap.put(jobId, new HashSet<String>()); + + JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos"); + int stagesSize = (stages == null ? 0 : stages.size()); + job.setNumStages(stagesSize); + for (int i = 0; i < stagesSize; i++) { + JSONObject stageInfo = (JSONObject) stages.get(i); + int stageId = JSONUtils.getInt(stageInfo, "Stage ID"); + int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID"); + String stageName = JSONUtils.getString(stageInfo, "Stage Name"); + int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks"); + this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks); + } + } + + private void handleStageSubmit(JSONObject event) { + JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info"); + int stageId = JSONUtils.getInt(stageInfo, "Stage ID"); + int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID"); + String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)); + stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>()); + + if (!stages.containsKey(key)) { + //may be further attempt for one stage + String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0"); + if (stages.containsKey(baseAttempt)) { + SparkStage stage = stages.get(baseAttempt); + String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()); + + String stageName = JSONUtils.getString(event, "Stage Name"); + int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks"); + this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks); + } + } + } + + private void handleStageComplete(JSONObject event) { + JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info"); + int stageId = JSONUtils.getInt(stageInfo, "Stage ID"); + int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID"); + String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)); + SparkStage stage = stages.get(key); + + // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0. + Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime); + + stage.setSubmitTime(submissionTime); + + long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime); + stage.setCompleteTime(completeTime); + this.lastEventTime = completeTime; + + if (stageInfo != null && stageInfo.containsKey("Failure Reason")) { + stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString()); + } else { + stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString()); + } + } + + private void handleExecutorRemoved(JSONObject event) { + String executorID = JSONUtils.getString(event, "Executor ID"); + SparkExecutor executor = executors.get(executorID); + long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); + executor.setEndTime(removedTime); + this.lastEventTime = removedTime; + } + + private void handleJobEnd(JSONObject event) { + int jobId = JSONUtils.getInt(event, "Job ID"); + SparkJob job = jobs.get(jobId); + + long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime); + job.setCompletionTime(completionTime); + this.lastEventTime = completionTime; + + JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result"); + String result = JSONUtils.getString(jobResult, "Result"); + if (result.equalsIgnoreCase("JobSucceeded")) { + job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString()); + } else { + job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString()); + } + } + + private void handleAppEnd(JSONObject event) { + long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime); + app.setEndTime(endTime); + this.lastEventTime = endTime; + } + + public void clearReader() throws Exception { + //clear tasks + for (SparkTask task : tasks.values()) { + LOG.info("Task {} does not have result or no task metrics.", task.getTaskId()); + task.setFailed(true); + aggregateToStage(task); + aggregateToExecutor(task); + this.flushEntities(task, false); + } + + List<SparkStage> needStoreStages = new ArrayList<>(); + for (SparkStage stage : this.stages.values()) { + int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); + if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) { + SparkJob job = this.jobs.get(jobId); + job.setNumSkippedStages(job.getNumSkippedStages() + 1); + job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks()); + } else { + this.aggregateToJob(stage); + this.aggregateStageToApp(stage); + needStoreStages.add(stage); + } + String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); + String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); + this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId)); + } + + this.flushEntities(needStoreStages, false); + for (SparkJob job : jobs.values()) { + this.aggregateJobToApp(job); + } + this.flushEntities(jobs.values(), false); + + app.setExecutors(executors.values().size()); + + long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName())); + long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig()) + ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) + : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName())); + + int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName()); + int driverCore = this.isClientMode(app.getConfig()) + ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) + : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName()); + + long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead"); + long driverMemoryOverhead = this.isClientMode(app.getConfig()) + ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") + : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead"); + + app.setExecMemoryBytes(executorMemory); + app.setDriveMemoryBytes(driverMemory); + app.setExecutorCores(executorCore); + app.setDriverCores(driverCore); + app.setExecutorMemoryOverhead(executorMemoryOverhead); + app.setDriverMemoryOverhead(driverMemoryOverhead); + + for (SparkExecutor executor : executors.values()) { + String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString()); + if (executorID.equalsIgnoreCase("driver")) { + executor.setExecMemoryBytes(driverMemory); + executor.setCores(driverCore); + executor.setMemoryOverhead(driverMemoryOverhead); + } else { + executor.setExecMemoryBytes(executorMemory); + executor.setCores(executorCore); + executor.setMemoryOverhead(executorMemoryOverhead); + } + if (app.getEndTime() <= 0L) { + app.setEndTime(this.lastEventTime); + } + if (executor.getEndTime() <= 0L) { + executor.setEndTime(app.getEndTime()); + } + this.aggregateExecutorToApp(executor); + } + this.flushEntities(executors.values(), false); + //spark code...tricky + app.setSkippedTasks(app.getCompleteTasks()); + this.flushEntities(app, true); + } + + private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) { + long result = 0L; + String fieldValue = config.getConfig().get(fieldName); + if (fieldValue != null) { + result = Utils.parseMemory(fieldValue + "m"); + if (result == 0L) { + result = Utils.parseMemory(fieldValue); + } + } + + if (result == 0L) { + result = Math.max( + Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), + executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100); + } + return result; + } + + private void aggregateExecutorToApp(SparkExecutor executor) { + long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime(); + if (totalExecutorTime < 0L) { + totalExecutorTime = 0L; + } + app.setTotalExecutorTime(totalExecutorTime); + } + + private void aggregateJobToApp(SparkJob job) { + //aggregate job level metrics + app.setNumJobs(app.getNumJobs() + 1); + app.setTotalTasks(app.getTotalTasks() + job.getNumTask()); + app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks()); + app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks()); + app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks()); + app.setTotalStages(app.getTotalStages() + job.getNumStages()); + app.setFailedStages(app.getFailedStages() + job.getNumFailedStages()); + app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages()); + } + + private void aggregateStageToApp(SparkStage stage) { + //aggregate task level metrics + app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled()); + app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled()); + app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime()); + app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime()); + app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime()); + app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime()); + app.setResultSize(app.getResultSize() + stage.getResultSize()); + app.setInputRecords(app.getInputRecords() + stage.getInputRecords()); + app.setInputBytes(app.getInputBytes() + stage.getInputBytes()); + app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords()); + app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes()); + app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords()); + app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes()); + app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords()); + app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes()); + } + + private void aggregateToStage(SparkTask task) { + String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()); + String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()); + String key = this.generateStageKey(stageId, stageAttemptId); + SparkStage stage = stages.get(key); + + stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled()); + stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled()); + stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime()); + stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime()); + stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime()); + stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime()); + stage.setResultSize(stage.getResultSize() + task.getResultSize()); + stage.setInputRecords(stage.getInputRecords() + task.getInputRecords()); + stage.setInputBytes(stage.getInputBytes() + task.getInputBytes()); + stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords()); + stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes()); + stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords()); + stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes()); + stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords()); + long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); + stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes); + + boolean success = !task.isFailed(); + + Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString())); + if (stageTaskStatusMap.get(key).containsKey(taskIndex)) { + //has previous task attempt, retrieved from task index in one stage + boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex); + success = previousResult || success; + if (previousResult != success) { + stage.setNumFailedTasks(stage.getNumFailedTasks() - 1); + stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); + stageTaskStatusMap.get(key).put(taskIndex, success); + } + } else { + if (success) { + stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1); + } else { + stage.setNumFailedTasks(stage.getNumFailedTasks() + 1); + } + stageTaskStatusMap.get(key).put(taskIndex, success); + } + + } + + private void aggregateToExecutor(SparkTask task) { + String executorId = task.getExecutorId(); + SparkExecutor executor = executors.get(executorId); + + if (null != executor) { + executor.setTotalTasks(executor.getTotalTasks() + 1); + if (task.isFailed()) { + executor.setFailedTasks(executor.getFailedTasks() + 1); + } else { + executor.setCompletedTasks(executor.getCompletedTasks() + 1); + } + long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes(); + executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes); + executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); + executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes()); + executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes()); + executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime()); + } + + } + + private void aggregateToJob(SparkStage stage) { + int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString())); + SparkJob job = jobs.get(jobId); + job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks()); + job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks()); + job.setNumTask(job.getNumTask() + stage.getNumTasks()); + + + if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) { + //if multiple attempts succeed, just count one + if (!hasStagePriorAttemptSuccess(stage)) { + job.setNumCompletedStages(job.getNumCompletedStages() + 1); + } + } else { + job.setNumFailedStages(job.getNumFailedStages() + 1); + } + } + + private boolean hasStagePriorAttemptSuccess(SparkStage stage) { + int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString())); + for (int i = 0; i < stageAttemptId; i++) { + SparkStage previousStage = stages.get(this.generateStageKey( + stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i))); + if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) { + return true; + } + } + return false; + } + + + private String generateStageKey(String stageId, String stageAttemptId) { + return stageId + "-" + stageAttemptId; + } + + private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) { + SparkStage stage = new SparkStage(); + stage.setTags(new HashMap<>(this.app.getTags())); + stage.setTimestamp(app.getTimestamp()); + stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId)); + stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId)); + stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId)); + stage.setName(name); + stage.setNumActiveTasks(0); + stage.setNumTasks(numTasks); + stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ? + "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool")); + + String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)); + stages.put(stageKey, stage); + this.jobStageMap.get(jobId).add(stageKey); + } + + + private SparkExecutor initiateExecutor(String executorID, long startTime) { + if (!executors.containsKey(executorID)) { + SparkExecutor executor = new SparkExecutor(); + executor.setTags(new HashMap<>(this.app.getTags())); + executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID); + executor.setStartTime(startTime); + executor.setTimestamp(app.getTimestamp()); + + this.executors.put(executorID, executor); + } + + return this.executors.get(executorID); + } + + private String getNormalizedName(String jobName, String assignedName) { + if (null != assignedName) { + return assignedName; + } else { + return JobNameNormalization.getInstance().normalize(jobName); + } + } + + private void flushEntities(Object entity, boolean forceFlush) { + this.flushEntities(Collections.singletonList(entity), forceFlush); + } + + private void flushEntities(Collection entities, boolean forceFlush) { + this.createEntities.addAll(entities); + + if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) { + try { + this.doFlush(this.createEntities); + this.createEntities.clear(); + } catch (Exception e) { + LOG.error("Fail to flush entities", e); + } + + } + } + + private EagleServiceBaseClient initiateClient() { + String host = conf.getString("eagleProps.eagle.service.host"); + int port = conf.getInt("eagleProps.eagle.service.port"); + String userName = conf.getString("eagleProps.eagle.service.username"); + String pwd = conf.getString("eagleProps.eagle.service.password"); + client = new EagleServiceClientImpl(host, port, userName, pwd); + int timeout = conf.getInt("eagleProps.eagle.service.read.timeout"); + client.getJerseyClient().setReadTimeout(timeout * 1000); + + return client; + } + + private void doFlush(List entities) throws IOException, EagleServiceClientException { + client.create(entities); + int size = (entities == null ? 0 : entities.size()); + LOG.info("finish flushing entities of total number " + size); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java new file mode 100644 index 0000000..b1dd09c --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class JHFSparkParser implements JHFParserBase { + + private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class); + + private boolean isValidJson; + + private JHFSparkEventReader eventReader; + + public JHFSparkParser(JHFSparkEventReader reader) { + this.eventReader = reader; + } + + @Override + public void parse(InputStream is) throws Exception { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + isValidJson = true; + JSONObject eventObj = parseAndValidateJSON(line); + if (isValidJson) { + try { + this.eventReader.read(eventObj); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + this.eventReader.clearReader(); + } + } + + private JSONObject parseAndValidateJSON(String line) { + JSONObject eventObj = null; + JSONParser parser = new JSONParser(); + try { + eventObj = (JSONObject) parser.parse(line); + } catch (ParseException ex) { + isValidJson = false; + logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex); + } + return eventObj; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java new file mode 100644 index 0000000..c206b71 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java @@ -0,0 +1,69 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +public class SparkApplicationInfo { + + private String state; + private String finalStatus; + private String queue; + private String name; + private String user; + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public String getFinalStatus() { + return finalStatus; + } + + public void setFinalStatus(String finalStatus) { + this.finalStatus = finalStatus; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java new file mode 100644 index 0000000..0144410 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history.crawl; + +import org.apache.eagle.jpm.util.SparkJobTagName; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader { + + private String site; + private SparkApplicationInfo app; + + + public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) { + this.site = site; + this.app = app; + } + + @Override + public void read(InputStream is) throws Exception { + Map<String, String> baseTags = new HashMap<>(); + baseTags.put(SparkJobTagName.SITE.toString(), site); + baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue()); + JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app)); + parser.parse(is); + } + + public static void main(String[] args) throws Exception { + SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo()); + impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1"))); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java index 9fafc1f..0bb65df 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java @@ -19,7 +19,7 @@ package org.apache.eagle.jpm.spark.history.status; -import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo; +import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo; import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -233,10 +233,11 @@ public class JobHistoryZKStateManager { curator.setData().forPath(path, status.toString().getBytes("UTF-8")); } } else { - LOG.error("Failed to update for application with path: " + path); + LOG.warn("failed to update with status {} due to path {} not existing ", status, path); + //throw new RuntimeException("Failed to update for application with path: " + path); } } catch (Exception e) { - LOG.error("fail to update application status", e); + LOG.error("fail to update application status as {}", status, e); throw new RuntimeException(e); } finally { try { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java index e88c62f..0351de3 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java @@ -19,9 +19,9 @@ package org.apache.eagle.jpm.spark.history.storm; -import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader; -import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo; -import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl; +import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader; +import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo; +import org.apache.eagle.jpm.spark.history.crawl.SparkFilesystemInputStreamReaderImpl; import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig; import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager; import org.apache.eagle.jpm.spark.history.status.ZKStateConstant; @@ -106,9 +106,12 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt { zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED); LOG.info("Successfully parse application {}", appId); collector.ack(tuple); + } catch (RuntimeException e) { + LOG.warn("fail to process application {} due to RuntimeException, ignore it", appId, e); + zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED); + collector.ack(tuple); } catch (Exception e) { - LOG.error("Fail to process application {}", appId, e); - zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED); + LOG.error("Fail to process application {}, and retry", appId, e); collector.fail(tuple); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java index 5602b4c..4c50607 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java @@ -91,10 +91,11 @@ public class SparkHistoryJobSpout extends BaseRichSpout { LOG.info("emit " + appId); zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE); } - LOG.info("{} apps sent.", appIds.size()); if (appIds.isEmpty()) { - this.takeRest(60); + this.takeRest(10); + } else { + LOG.info("{} apps sent.", appIds.size()); } } catch (Exception e) { LOG.error("Fail to run next tuple", e); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml index 26842b8..b94c603 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml @@ -153,24 +153,6 @@ <value>http://sandbox.hortonworks.com:8088</value> </property> <property> - <name>storm.mode</name> - <displayName>mode</displayName> - <description>Storm Mode: local or cluster</description> - <value>local</value> - </property> - <property> - <name>storm.worker.num</name> - <displayName>worker.num</displayName> - <description>The number of workers</description> - <value>2</value> - </property> - <property> - <name>name</name> - <displayName>name</displayName> - <description>Name of the topology</description> - <value>sparkHistoryJob</value> - </property> - <property> <name>storm.messageTimeoutSec</name> <displayName>messageTimeoutSec</displayName> <description>Message timeout (in seconds)</description> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf index 58dd552..4c22b15 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf @@ -15,6 +15,9 @@ { + "appId": "sparkHistoryJob", + "mode": "CLUSTER", + "workers" : 3, "basic":{ "cluster":"sandbox", "dataCenter":"sandbox", @@ -45,8 +48,6 @@ } }, "storm":{ - worker.num: 2, - "name":"sparkHistoryJob", "messageTimeoutSec": 3000, "pendingSpout": 1000, "spoutCrawlInterval": 10000,#in ms @@ -72,7 +73,5 @@ spark.yarn.am.memoryOverhead.factor: 10, spark.yarn.overhead.min: "384m" } - }, - "appId": "sparkHistoryJob", - "mode": "CLUSTER" + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java deleted file mode 100644 index 5d1cfaa..0000000 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.running.entities; - -import org.apache.eagle.log.entity.repo.EntityRepository; - -public class JPMEntityRepository extends EntityRepository { - public JPMEntityRepository() { - entitySet.add(SparkAppEntity.class); - entitySet.add(SparkJobEntity.class); - entitySet.add(SparkStageEntity.class); - entitySet.add(SparkTaskEntity.class); - entitySet.add(SparkExecutorEntity.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java deleted file mode 100644 index e18f1e7..0000000 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.running.entities; - -import java.io.Serializable; -import java.util.HashMap; - -public class JobConfig extends HashMap<String, String> implements Serializable { -} -