http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java index 2684899..e0ec330 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java @@ -18,8 +18,8 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -27,12 +27,12 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa") @ColumnFamily("f") @Prefix("jexec") -@Service(JPAConstants.JPA_JOB_EXECUTION_SERVICE_NAME) +@Service(Constants.JPA_JOB_EXECUTION_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) @Indexes({ - @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true), - @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false) + @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false) }) public class JobExecutionAPIEntity extends JobBaseAPIEntity { @Column("a") @@ -55,6 +55,22 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { private int numFinishedReduces; @Column("j") private JobCounters jobCounters; + @Column("k") + private int dataLocalMaps; + @Column("l") + private double dataLocalMapsPercentage; + @Column("m") + private int rackLocalMaps; + @Column("n") + private double rackLocalMapsPercentage; + @Column("o") + private int totalLaunchedMaps; + @Column("p") + private long submissionTime; + @Column("q") + private long lastMapDuration; + @Column("r") + private long lastReduceDuration; public String getCurrentState() { return currentState; @@ -129,4 +145,76 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { this.jobCounters = jobCounters; _pcs.firePropertyChange("jobCounters", null, null); } + + public int getDataLocalMaps() { + return dataLocalMaps; + } + + public void setDataLocalMaps(int dataLocalMaps) { + this.dataLocalMaps = dataLocalMaps; + valueChanged("dataLocalMaps"); + } + + public double getDataLocalMapsPercentage() { + return dataLocalMapsPercentage; + } + + public void setDataLocalMapsPercentage(double dataLocalMapsPercentage) { + this.dataLocalMapsPercentage = dataLocalMapsPercentage; + valueChanged("dataLocalMapsPercentage"); + } + + public int getRackLocalMaps() { + return rackLocalMaps; + } + + public void setRackLocalMaps(int rackLocalMaps) { + this.rackLocalMaps = rackLocalMaps; + valueChanged("rackLocalMaps"); + } + + public double getRackLocalMapsPercentage() { + return rackLocalMapsPercentage; + } + + public void setRackLocalMapsPercentage(double rackLocalMapsPercentage) { + this.rackLocalMapsPercentage = rackLocalMapsPercentage; + valueChanged("rackLocalMapsPercentage"); + } + + public int getTotalLaunchedMaps() { + return totalLaunchedMaps; + } + + public void setTotalLaunchedMaps(int totalLaunchedMaps) { + this.totalLaunchedMaps = totalLaunchedMaps; + valueChanged("totalLaunchedMaps"); + } + + public long getSubmissionTime() { + return submissionTime; + } + + public void setSubmissionTime(long submissionTime) { + this.submissionTime = submissionTime; + valueChanged("submissionTime"); + } + + public long getLastMapDuration() { + return lastMapDuration; + } + + public void setLastMapDuration(long lastMapDuration) { + this.lastMapDuration = lastMapDuration; + valueChanged("lastMapDuration"); + } + + public long getLastReduceDuration() { + return lastReduceDuration; + } + + public void setLastReduceDuration(long lastReduceDuration) { + this.lastReduceDuration = lastReduceDuration; + valueChanged("lastReduceDuration"); + } }
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java index 2400c55..9e8a372 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa_process") @ColumnFamily("f") @Prefix("process") -@Service(JPAConstants.JPA_JOB_PROCESS_TIME_STAMP_NAME) +@Service(Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME) @TimeSeries(true) @Partition({"site"}) public class JobProcessTimeStampEntity extends TaggedLogAPIEntity { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java index 9769620..929a98f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa_anomaly") @ColumnFamily("f") @Prefix("tacount") -@Service(JPAConstants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME) +@Service(Constants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java index 77994a5..abc28e2 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java @@ -18,8 +18,8 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa_task") @ColumnFamily("f") @Prefix("taexec") -@Service(JPAConstants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME) +@Service(Constants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) @Indexes({ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java index f287688..c1f71b8 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java @@ -18,8 +18,8 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa_task") @ColumnFamily("f") @Prefix("texec") -@Service(JPAConstants.JPA_TASK_EXECUTION_SERVICE_NAME) +@Service(Constants.JPA_TASK_EXECUTION_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) public class TaskExecutionAPIEntity extends JobBaseAPIEntity { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java index 5ae67c0..7456522 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.history.entities; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.meta.*; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @Table("eaglejpa_anomaly") @ColumnFamily("f") @Prefix("taskfailurecount") -@Service(JPAConstants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME) +@Service(Constants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME) @TimeSeries(true) @Partition({"site"}) public class TaskFailureCountAPIEntity extends JobBaseAPIEntity { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java deleted file mode 100644 index 1c1c759..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.jobcounter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -/** - * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure - * counters. - * - */ -public final class CounterGroupDictionary { - - private final List<CounterGroupKey> groupKeys = new ArrayList<>(); - - private static volatile CounterGroupDictionary instance = null; - private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class); - - private CounterGroupDictionary() {} - - public static CounterGroupDictionary getInstance() throws JobCounterException { - if (instance == null) { - synchronized (CounterGroupDictionary.class) { - if (instance == null) { - CounterGroupDictionary tmp = new CounterGroupDictionary(); - tmp.initialize(); - instance = tmp; - } - } - } - return instance; - } - - public CounterGroupKey getCounterGroupByName(String groupName) { - for (CounterGroupKey groupKey : groupKeys) { - if (groupKey.getName().equalsIgnoreCase(groupName)) { - return groupKey; - } - } - return null; - } - - public CounterGroupKey getCounterGroupByIndex(int groupIndex) { - if (groupIndex < 0 || groupIndex >= groupKeys.size()) { - return null; - } - return groupKeys.get(groupIndex); - } - - private void initialize() throws JobCounterException { - // load config.properties file from classpath - InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf"); - try { - if (is == null) { - is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf"); - if (is == null) { - final String errMsg = "Failed to load JobCounter.conf"; - LOG.error(errMsg); - throw new JobCounterException(errMsg); - } - } - final Properties prop = new Properties(); - try { - prop.load(is); - } catch(Exception ex) { - final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage(); - LOG.error(errMsg, ex); - throw new JobCounterException(errMsg, ex); - } - int groupIndex = 0; - while (parseGroup(groupIndex, prop)) { - ++groupIndex; - } - } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - } - } - } - } - - private boolean parseGroup(int groupIndex, Properties prop) { - final String groupKeyBase = "counter.group" + groupIndex; - final String groupNameKey = groupKeyBase + ".name"; - final String groupName = prop.getProperty(groupNameKey); - - if (groupName == null) { - return false; - } - - final String groupDescriptionKey = groupKeyBase + ".description"; - final String groupDescription = prop.getProperty(groupDescriptionKey); - final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription); - final ArrayList<CounterKey> counters = new ArrayList<CounterKey>(); - - int counterIndex = 0; - while (parseCounter(groupKey, counterIndex, counters, prop)) { - ++counterIndex; - } - groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()])); - groupKeys.add(groupKey); - return true; - } - - private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List<CounterKey> counters, Properties prop) { - final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex; - final String counterNameKey = counterKeyBase + ".names"; - final String counterNamesString = prop.getProperty(counterNameKey); - - if (counterNamesString == null) { - return false; - } - final String[] names = counterNamesString.split(","); - final List<String> counterNames = new ArrayList<String>(); - for (String name : names) { - counterNames.add(name.trim()); - } - - final String counterDescriptionKey = counterKeyBase + ".description"; - final String counterDescription = prop.getProperty(counterDescriptionKey); - - CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey); - counters.add(counter); - return true; - } - - private static class CounterKeyImpl implements CounterKey { - private final int index; - private final List<String> counterNames; - private final String description; - private final CounterGroupKey groupKey; - - public CounterKeyImpl(int index, List<String> counterNames, String description, CounterGroupKey groupKey) { - this.index = index; - this.counterNames = counterNames; - this.description = description; - this.groupKey = groupKey; - } - @Override - public int getIndex() { - return index; - } - @Override - public List<String> getNames() { - return counterNames; - } - @Override - public String getDescription() { - return description; - } - @Override - public CounterGroupKey getGroupKey() { - return groupKey; - } - } - - private static class CounterGroupKeyImpl implements CounterGroupKey { - private final int index; - private final String name; - private final String description; - private CounterKey[] counterKeys; - - public CounterGroupKeyImpl(int index, String name, String description) { - this.index = index; - this.name = name; - this.description = description; - } - - public void setCounterKeys(CounterKey[] counterKeys) { - this.counterKeys = counterKeys; - } - - @Override - public int getIndex() { - return index; - } - @Override - public String getName() { - return name; - } - @Override - public String getDescription() { - return description; - } - @Override - public int getCounterNumber() { - return counterKeys.length; - } - @Override - public List<CounterKey> listCounterKeys() { - return Arrays.asList(counterKeys); - } - @Override - public CounterKey getCounterKeyByName(String name) { - for (CounterKey counterKey : counterKeys) { - for (String n : counterKey.getNames()) { - if (n.equalsIgnoreCase(name)) { - return counterKey; - } - } - } - return null; - } - @Override - public CounterKey getCounterKeyByID(int index) { - if (index < 0 || index >= counterKeys.length) { - return null; - } - return counterKeys[index]; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java deleted file mode 100644 index 82606d1..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.jobcounter; - -import java.util.List; - -public interface CounterGroupKey { - - String getName(); - String getDescription(); - int getIndex(); - int getCounterNumber(); - List<CounterKey> listCounterKeys(); - CounterKey getCounterKeyByName(String name); - CounterKey getCounterKeyByID(int index); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java deleted file mode 100644 index 161490f..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.jobcounter; - -import java.util.List; - -public interface CounterKey { - - List<String> getNames(); - String getDescription(); - int getIndex(); - CounterGroupKey getGroupKey(); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java deleted file mode 100644 index 5ffaf51..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.jobcounter; - -public class JobCounterException extends Exception { - - /** - * - */ - private static final long serialVersionUID = -4525162176188266862L; - - /** - * Default constructor of JobCounterException - */ - public JobCounterException() { - super(); - } - - /** - * Constructor of JobCounterException - * - * @param message error message - */ - public JobCounterException(String message) { - super(message); - } - - /** - * Constructor of JobCounterException - * - * @param message error message - * @param cause the cause of the exception - * - */ - public JobCounterException(String message, Throwable cause) { - super(message, cause); - } - - /** - * Constructor of JobCounterException - * - * @param cause the cause of the exception - */ - public JobCounterException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java deleted file mode 100644 index 2806cf1..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.jobcounter; - -import java.util.Map; -import java.util.TreeMap; - - -public final class JobCounters { - - private Map<String, Map<String, Long>> counters = new TreeMap<>(); - - public Map<String, Map<String, Long>> getCounters() { - return counters; - } - - public void setCounters(Map<String, Map<String, Long>> counters) { - this.counters = counters; - } - - public String toString(){ - return counters.toString(); - } - - public void clear() { - for (Map.Entry<String, Map<String, Long>> entry : counters.entrySet()) { - entry.getValue().clear(); - } - counters.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java deleted file mode 100644 index 9d13fb4..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.parser; - -public enum EagleJobTagName { - SITE("site"), - RACK("rack"), - HOSTNAME("hostname"), - JOB_NAME("jobName"), - NORM_JOB_NAME("normJobName"), - JOB_ID("jobID"), - TASK_ID("taskID"), - TASK_ATTEMPT_ID("taskAttemptID"), - JOB_STATUS("jobStatus"), - USER("user"), - TASK_TYPE("taskType"), - TASK_EXEC_TYPE("taskExecType"), - ERROR_CATEGORY("errorCategory"), - JOB_QUEUE("queue"), - RULE_TYPE("ruleType"), - JOB_TYPE("jobType"); - - private String tagName; - private EagleJobTagName(String tagName) { - this.tagName = tagName; - } - - public String toString() { - - return this.tagName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 31bfdb5..6442699 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -18,11 +18,13 @@ package org.apache.eagle.jpm.mr.history.parser; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; -import org.apache.eagle.jpm.mr.history.common.JobConfig; +import org.apache.eagle.jpm.mr.history.entities.JobConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.entities.*; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.JobNameNormalization; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.jobhistory.EventType; @@ -56,10 +58,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl // hostname to rack mapping protected Map<String, String> m_host2RackMapping; - protected String m_jobID; + protected String m_jobId; protected String m_jobName; protected String m_jobType; - protected String m_normJobName; + protected String m_jobDefId; protected String m_user; protected String m_queueName; protected Long m_jobLauchTime; @@ -69,12 +71,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl protected final Configuration configuration; - public JPAConstants.JobType fetchJobType(Configuration config) { - if (config.get(JPAConstants.JobConfiguration.CASCADING_JOB) != null) { return JPAConstants.JobType.CASCADING; } - if (config.get(JPAConstants.JobConfiguration.HIVE_JOB) != null) { return JPAConstants.JobType.HIVE; } - if (config.get(JPAConstants.JobConfiguration.PIG_JOB) != null) { return JPAConstants.JobType.PIG; } - if (config.get(JPAConstants.JobConfiguration.SCOOBI_JOB) != null) {return JPAConstants.JobType.SCOOBI; } - return JPAConstants.JobType.NOTAVALIABLE; + public Constants.JobType fetchJobType(Configuration config) { + if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; } + if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { return Constants.JobType.HIVE; } + if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { return Constants.JobType.PIG; } + if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {return Constants.JobType.SCOOBI; } + return Constants.JobType.NOTAVALIABLE; } /** @@ -121,7 +123,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl public void close() throws IOException { // check if this job history file is complete if (m_jobExecutionEntity.getEndTime() == 0L) { - throw new IOException(new JHFWriteNotCompletedException(m_jobID)); + throw new IOException(new JHFWriteNotCompletedException(m_jobId)); } try { flush(); @@ -142,7 +144,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl * @param id */ private void setJobID(String id) { - this.m_jobID = id; + this.m_jobId = id; } private void setJobType(String jobType) { @@ -152,10 +154,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception { String id = values.get(Keys.JOBID); - if (m_jobID == null) { + if (m_jobId == null) { setJobID(id); - } else if (!m_jobID.equals(id)) { - String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobID + "'"; + } else if (!m_jobId.equals(id)) { + String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'"; LOG.error(msg); throw new ImportException(msg); } @@ -165,51 +167,63 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl m_user = values.get(Keys.USER); m_queueName = values.get(Keys.JOB_QUEUE); m_jobName = values.get(Keys.JOBNAME); - m_normJobName = m_jobName; + m_jobDefId = m_jobName; - LOG.info("NormJobName of " + id + ": " + m_normJobName); + // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate. + String jobDefId = null; + if(configuration != null ) jobDefId = configuration.get(Constants.JOB_DEFINITION_ID_KEY); - m_jobSubmitEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); - m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); - m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); - m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); - m_jobSubmitEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); - m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + if(jobDefId == null) { + m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName); + } else { + LOG.debug("Got normJobName from job configuration for " + id + ": " + jobDefId); + m_jobDefId = jobDefId; + } + + LOG.info("NormJobName of " + id + ": " + m_jobDefId); + + m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); entityCreated(m_jobSubmitEventEntity); } else if(values.get(Keys.LAUNCH_TIME) != null) { // job launched m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME))); m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp(); - m_jobLaunchEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); - m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); - m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); - m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); - m_jobLaunchEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); - m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS)); m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES)); entityCreated(m_jobLaunchEventEntity); } else if(values.get(Keys.FINISH_TIME) != null) { // job finished m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME))); - m_jobFinishEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); - m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); - m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); - m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); - m_jobFinishEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); - m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); entityCreated(m_jobFinishEventEntity); // populate jobExecutionEntity entity - m_jobExecutionEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); - m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); - m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); - m_jobExecutionEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); - m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_QUEUE.toString(), m_queueName); - m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType); + m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp()); m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp()); m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); + m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp()); if (values.get(Keys.FAILED_MAPS) != null) { // for Artemis m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS))); @@ -223,7 +237,27 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps); m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces); if (values.get(Keys.COUNTERS) != null || totalCounters != null) { - m_jobExecutionEntity.setJobCounters(parseCounters(totalCounters)); + JobCounters jobCounters = parseCounters(totalCounters); + m_jobExecutionEntity.setJobCounters(jobCounters); + if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) { + Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER); + if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) { + m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue()); + } + + if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) { + m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue()); + } + + if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) { + m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue()); + } + } + + if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) { + m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); + m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); + } } entityCreated(m_jobExecutionEntity); } @@ -261,13 +295,13 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl final String taskID = values.get(Keys.TASKID); Map<String, String> taskBaseTags = new HashMap<String, String>(){{ - put(EagleJobTagName.TASK_TYPE.toString(), taskType); - put(EagleJobTagName.USER.toString(), m_user); - //put(EagleJobTagName.JOB_NAME.toString(), _jobName); - put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); - put(EagleJobTagName.JOB_TYPE.toString(), m_jobType); - put(EagleJobTagName.JOB_ID.toString(), m_jobID); - put(EagleJobTagName.TASK_ID.toString(), taskID); + put(MRJobTagName.TASK_TYPE.toString(), taskType); + put(MRJobTagName.USER.toString(), m_user); + //put(MRJobTagName.JOB_NAME.toString(), _jobName); + put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + put(MRJobTagName.JOB_TYPE.toString(), m_jobType); + put(MRJobTagName.JOB_ID.toString(), m_jobId); + put(MRJobTagName.TASK_ID.toString(), taskID); }}; taskBaseTags.putAll(m_baseTags); if (recType == RecordTypes.Task && startTime != null) { // task start, no host is assigned yet @@ -278,8 +312,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl Map<String, String> taskExecutionTags = new HashMap<>(taskBaseTags); String hostname = m_taskRunningHosts.get(taskID); hostname = (hostname == null) ? "" : hostname; // TODO if task fails, then no hostname - taskExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname); - taskExecutionTags.put(EagleJobTagName.RACK.toString(), m_host2RackMapping.get(hostname)); + taskExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname); + taskExecutionTags.put(MRJobTagName.RACK.toString(), m_host2RackMapping.get(hostname)); entity.setTags(taskExecutionTags); entity.setStartTime(m_taskStartTime.get(taskID)); entity.setEndTime(Long.valueOf(finishTime)); @@ -290,6 +324,13 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl if (values.get(Keys.COUNTERS) != null || counters != null) { entity.setJobCounters(parseCounters(counters)); } + long duration = entity.getEndTime() - m_jobSubmitEventEntity.getTimestamp(); + if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > m_jobExecutionEntity.getLastMapDuration()) { + m_jobExecutionEntity.setLastMapDuration(duration); + } + if (taskType.equals(Constants.TaskType.REDUCE.toString()) && duration > m_jobExecutionEntity.getLastReduceDuration()) { + m_jobExecutionEntity.setLastReduceDuration(duration); + } entityCreated(entity); //_taskStartTime.remove(taskID); // clean this taskID } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start @@ -300,8 +341,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl entity.setTags(taskAttemptExecutionTags); String hostname = values.get(Keys.HOSTNAME); String rack = values.get(Keys.RACK); - taskAttemptExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname); - taskAttemptExecutionTags.put(EagleJobTagName.RACK.toString(), rack); + taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname); + taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack); // put last attempt's hostname to task level m_taskRunningHosts.put(taskID, hostname); // it is very likely that an attempt ID could be both succeeded and failed due to M/R system @@ -345,17 +386,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl if (matchMustHaveKeyPatterns(prop)) { JobConfigurationAPIEntity jobConfigurationEntity = new JobConfigurationAPIEntity(); jobConfigurationEntity.setTags(new HashMap<>(m_baseTags)); - jobConfigurationEntity.getTags().put(EagleJobTagName.USER.toString(), m_user); - jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID); - jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName); - jobConfigurationEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName); - jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(), m_jobType); + jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), m_jobType); jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); JobConfig jobConfig = new JobConfig(); jobConfig.setConfig(prop); jobConfigurationEntity.setJobConfig(jobConfig); - jobConfigurationEntity.setConfigJobName(m_normJobName); + jobConfigurationEntity.setConfigJobName(m_jobDefId); entityCreated(jobConfigurationEntity); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java index 7fe8909..278deca 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java @@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.parser; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counter; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java index 6fd2c18..2d960b0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -19,15 +19,12 @@ package org.apache.eagle.jpm.mr.history.parser; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.jobhistory.*; -import org.apache.hadoop.util.StringInterner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java index e485cc8..38ca35c 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java @@ -20,11 +20,11 @@ package org.apache.eagle.jpm.mr.history.parser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.eagle.jpm.mr.history.common.JPAConstants; import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity; import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity; -import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,16 +83,16 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result(); if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>(); - mapTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration); - counters.put(JPAConstants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter); + mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration); + counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter); Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result(); if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>(); - reduceTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration); - counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter); + reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration); + counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter); - counters.put(JPAConstants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result()); - counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result()); + counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result()); + counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result()); jobCounters.setCounters(counters); @@ -105,18 +105,18 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity entity) { JobCounters jobCounters = entity.getJobCounters(); - String taskType = entity.getTags().get(JPAConstants.JOB_TASK_TYPE_TAG); + String taskType = entity.getTags().get(Constants.JOB_TASK_TYPE_TAG); if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) { - if (JPAConstants.TaskType.MAP.toString().equals(taskType.toUpperCase())) { + if (Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) { m_mapAttemptDuration += entity.getDuration(); - this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER)); - this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER)); + this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER)); + this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER)); return; - } else if (JPAConstants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) { + } else if (Constants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) { m_reduceAttemptDuration += entity.getDuration(); - this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER)); - this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER)); + this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER)); + this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER)); return; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java index 23e7072..94de068 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; import org.apache.eagle.jpm.mr.history.entities.TaskAttemptCounterAPIEntity; import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.slf4j.Logger; @@ -74,12 +75,12 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity; Map<String, String> tags = new HashMap<>(); - tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString())); - tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString())); - tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString())); - tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString())); - tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString())); - tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString())); + tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString())); + tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString())); + tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString())); + tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString())); + tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString())); + tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString())); CounterKey key = new CounterKey(); key.tags = tags; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index 14cc882..177fdc1 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity; import org.apache.eagle.jpm.mr.history.entities.TaskFailureCountAPIEntity; +import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.slf4j.Logger; @@ -80,17 +81,17 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity(); Map<String, String> tags = new HashMap<>(); failureTask.setTags(tags); - tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString())); - tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString())); - tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString())); - tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString())); - tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString())); - tags.put(EagleJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID()); - tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString())); + tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString())); + tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString())); + tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString())); + tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString())); + tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString())); + tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID()); + tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString())); //TODO need optimize, match and then capture the data final String errCategory = classifier.classifyError(e.getError()); - tags.put(EagleJobTagName.ERROR_CATEGORY.toString(), errCategory); + tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory); failureTask.setError(e.getError()); failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf index db62cfb..3a08c52 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf @@ -110,6 +110,9 @@ counter.group2.counter10.names = SLOTS_MILLIS_MAPS counter.group2.counter10.description = Total vcore-seconds taken by all reduce tasks counter.group2.counter11.names = SLOTS_MILLIS_REDUCES counter.group2.counter11.description = Total vcore-seconds taken by all reduce tasks +counter.group2.counter12.names = RACK_LOCAL_MAPS +counter.group2.counter12.description = Total vcore-seconds taken by all reduce tasks + counter.group3.name = MapTaskAttemptCounter counter.group3.description = Reduce Task Attempt Counter Aggregation http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index 8cb1aa3..1b97271 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -62,7 +62,7 @@ "password": "secret" } }, - + "MRConfigureKeys" : [ "mapreduce.map.output.compress", "mapreduce.map.output.compress.codec", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml new file mode 100644 index 0000000..3c8aa92 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml @@ -0,0 +1,126 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>eagle-jpm-parent</artifactId> + <groupId>org.apache.eagle</groupId> + <version>0.5.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>eagle-jpm-mr-running</artifactId> + <name>eagle-jpm-mr-running</name> + <url>http://maven.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-api</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.wso2.orbit.com.lmax</groupId> + <artifactId>disruptor</artifactId> + </exclusion> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-base</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.wso2.orbit.com.lmax</groupId> + <artifactId>disruptor</artifactId> + </exclusion> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-job-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-util</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/assembly/eagle-jpm-mr-running-assembly.xml</descriptor> + <finalName>eagle-jpm-mr-running-${project.version}</finalName> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <tarLongFileMode>posix</tarLongFileMode> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml b/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml new file mode 100644 index 0000000..66133a0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>assembly</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>false</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + <!--includes> + <include>org.apache.hadoop:hadoop-common</include> + <include>org.apache.hadoop:hadoop-hdfs</include> + <include>org.apache.hadoop:hadoop-client</include> + <include>org.apache.hadoop:hadoop-auth</include> + <include>org.apache.eagle:eagle-stream-process-api</include> + <include>org.apache.eagle:eagle-stream-process-base</include> + <include>org.jsoup:jsoup</include> + </includes--> + <excludes> + <exclude>org.wso2.orbit.com.lmax:disruptor</exclude> + <exclude>asm:asm</exclude> + <exclude>org.apache.storm:storm-core</exclude> + </excludes> + </dependencySet> + </dependencySets> + <fileSets> + <fileSet> + <directory>${project.build.outputDirectory}/</directory> + <outputDirectory>/</outputDirectory> + <!--<includes>--> + <!--<include>*.conf</include>--> + <!--<include>*.xml</include>--> + <!--<include>*.properties</include>--> + <!--<include>*.config</include>--> + <!--<include>classes/META-INF/*</include>--> + <!--</includes>--> + + <excludes> + <exclude>*.yaml</exclude> + </excludes> + </fileSet> + </fileSets> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java new file mode 100644 index 0000000..fb8b805 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.running; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager; +import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; +import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt; +import org.apache.eagle.jpm.util.Constants; + +import java.util.List; +import java.util.regex.Pattern; + +public class MRRunningJobMain { + public static void main(String[] args) { + + try { + //1. trigger init conf + MRRunningConfigManager mrRunningConfigManager = MRRunningConfigManager.getInstance(args); + + List<String> confKeyKeys = mrRunningConfigManager.getConfig().getStringList("MRConfigureKeys.jobConfigKey"); + confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB); + confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB); + confKeyKeys.add(Constants.JobConfiguration.PIG_JOB); + confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB); + + //2. init topology + TopologyBuilder topologyBuilder = new TopologyBuilder(); + String topologyName = mrRunningConfigManager.getConfig().getString("envContextConfig.topologyName"); + String spoutName = "mrRunningJobFetchSpout"; + String boltName = "mrRunningJobParseBolt"; + int parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName); + int tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName); + if (parallelism > tasks) { + parallelism = tasks; + } + topologyBuilder.setSpout( + spoutName, + new MRRunningJobFetchSpout( + mrRunningConfigManager.getJobExtractorConfig(), + mrRunningConfigManager.getEndpointConfig(), + mrRunningConfigManager.getZkStateConfig()), + parallelism + ).setNumTasks(tasks); + + parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName); + tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName); + if (parallelism > tasks) { + parallelism = tasks; + } + topologyBuilder.setBolt(boltName, + new MRRunningJobParseBolt( + mrRunningConfigManager.getEagleServiceConfig(), + mrRunningConfigManager.getEndpointConfig(), + mrRunningConfigManager.getJobExtractorConfig(), + mrRunningConfigManager.getZkStateConfig(), + confKeyKeys), + parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); + + backtype.storm.Config config = new backtype.storm.Config(); + config.setNumWorkers(mrRunningConfigManager.getConfig().getInt("envContextConfig.workers")); + config.put(Config.TOPOLOGY_DEBUG, true); + if (!mrRunningConfigManager.getEnv().equals("local")) { + //cluster mode + //parse conf here + StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology()); + } else { + //local mode + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology(topologyName, config, topologyBuilder.createTopology()); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java new file mode 100644 index 0000000..05e7812 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.running.config; + +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.eagle.dataproc.util.ConfigOptionParser; + +import java.io.Serializable; + +public class MRRunningConfigManager implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class); + public String getEnv() { + return env; + } + private String env; + + public ZKStateConfig getZkStateConfig() { return zkStateConfig; } + private ZKStateConfig zkStateConfig; + + public EagleServiceConfig getEagleServiceConfig() { + return eagleServiceConfig; + } + private EagleServiceConfig eagleServiceConfig; + + public JobExtractorConfig getJobExtractorConfig() { + return jobExtractorConfig; + } + private JobExtractorConfig jobExtractorConfig; + + public EndpointConfig getEndpointConfig() { + return endpointConfig; + } + private EndpointConfig endpointConfig; + + public static class ZKStateConfig implements Serializable { + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; + public String zkPort; + } + + public static class EagleServiceConfig implements Serializable { + public String eagleServiceHost; + public int eagleServicePort; + public int readTimeoutSeconds; + public int maxFlushNum; + public String username; + public String password; + } + + public static class JobExtractorConfig implements Serializable { + public String site; + public int fetchRunningJobInterval; + public int parseJobThreadPoolSize; + public int topAndBottomTaskByElapsedTime; + } + + public static class EndpointConfig implements Serializable { + public String[] rmUrls; + } + + public Config getConfig() { + return config; + } + private Config config; + + private static MRRunningConfigManager manager = new MRRunningConfigManager(); + + private MRRunningConfigManager() { + this.eagleServiceConfig = new EagleServiceConfig(); + this.jobExtractorConfig = new JobExtractorConfig(); + this.endpointConfig = new EndpointConfig(); + this.zkStateConfig = new ZKStateConfig(); + } + + public static MRRunningConfigManager getInstance(String[] args) { + manager.init(args); + return manager; + } + + private void init(String[] args) { + try { + LOG.info("Loading from configuration file"); + this.config = new ConfigOptionParser().load(args); + } catch (Exception e) { + LOG.error("failed to load config"); + } + + this.env = config.getString("envContextConfig.env"); + + //parse eagle zk + this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum"); + this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort"); + this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs"); + this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes"); + this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval"); + this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot"); + + // parse eagle service endpoint + this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host"); + String port = config.getString("eagleProps.eagleService.port"); + this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port); + this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username"); + this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password"); + this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds"); + this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum"); + //parse job extractor + this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site"); + this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval"); + this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize"); + this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime"); + + //parse data source config + this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]); + + LOG.info("Successfully initialized MRRunningConfigManager"); + LOG.info("env: " + this.env); + LOG.info("site: " + this.jobExtractorConfig.site); + LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); + LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); + } +}