Repository: incubator-eagle
Updated Branches:
  refs/heads/master 020a5b3cf -> cfd5c7f82


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapInputFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapInputFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapInputFunc.java
new file mode 100644
index 0000000..1a92600
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapInputFunc.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+
+public class MapInputFunc extends AbstractInputFunc {
+
+    public MapInputFunc() {
+        super(JobCounters.CounterName.HDFS_BYTES_READ, 
Constants.SuggestionType.MapInput);
+    }
+
+    public MapInputFunc(double threshold) {
+        super(JobCounters.CounterName.HDFS_BYTES_READ, 
Constants.SuggestionType.MapInput, threshold);
+    }
+
+    @Override
+    protected MRTaskExecutionResponse.TaskGroup 
getTasks(MRTaskExecutionResponse.TaskGroupResponse tasks) {
+        return tasks.tasksGroupByType.get(Constants.TaskType.MAP.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapSpillFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapSpillFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapSpillFunc.java
new file mode 100644
index 0000000..e30f860
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/MapSpillFunc.java
@@ -0,0 +1,90 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+import 
org.apache.eagle.service.jpm.MRTaskExecutionResponse.JobSuggestionResponse;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroupResponse;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MapSpillFunc implements SuggestionFunc {
+
+    private static final double SPILL_DEVIATION_THRESHOLD = 2;
+
+    private static final String SPILL_RATIO_NAME_FORMAT = "average %s 
deviation";
+    private static final String SPILL_SUGGESTION_FORMAT = "map spill deviation 
exceeds threshold %.2f, where the deviation is %.2f / %.2f";
+
+    private double threshold;
+
+    public MapSpillFunc() {
+        this.threshold = SPILL_DEVIATION_THRESHOLD;
+    }
+
+    public MapSpillFunc(double threshold) {
+        this.threshold = threshold > 0 ? threshold : SPILL_DEVIATION_THRESHOLD;
+    }
+
+    private MRTaskExecutionResponse.TaskGroup getTasks(TaskGroupResponse data) 
{
+        return data.tasksGroupByType.get(Constants.TaskType.MAP.toString());
+    }
+
+    private double getAverageSpillBytes(List<TaskExecutionAPIEntity> tasks) {
+        if (tasks.isEmpty()) {
+            return 0;
+        }
+        long totalSpillBytes = 0;
+        for (TaskExecutionAPIEntity task : tasks) {
+            totalSpillBytes += 
task.getJobCounters().getCounterValue(JobCounters.CounterName.SPLIT_RAW_BYTES);
+        }
+        return totalSpillBytes / tasks.size();
+    }
+
+    @Override
+    public JobSuggestionResponse apply(TaskGroupResponse data) {
+        MRTaskExecutionResponse.TaskGroup tasks = getTasks(data);
+
+        double smallerSpillBytes = getAverageSpillBytes(tasks.shortTasks);
+        double largerSpillBytes = getAverageSpillBytes(tasks.longTasks);
+
+        JobSuggestionResponse response = new JobSuggestionResponse();
+        response.suggestionType = Constants.SuggestionType.MapSpill.toString();
+        response.suggestionResults = getSpillSuggest(smallerSpillBytes, 
largerSpillBytes);
+        return response;
+    }
+
+    private List<MRTaskExecutionResponse.SuggestionResult> 
getSpillSuggest(double smallerSpillBytes, double largerSpillBytes) {
+        if (smallerSpillBytes == 0) {
+            smallerSpillBytes = 1;
+        }
+        double deviation = largerSpillBytes / smallerSpillBytes;
+        String suggestion = null;
+        if (deviation > threshold) {
+            suggestion = String.format(SPILL_SUGGESTION_FORMAT, threshold, 
largerSpillBytes, smallerSpillBytes);
+        }
+        List<MRTaskExecutionResponse.SuggestionResult> suggestionResults = new 
ArrayList<>();
+        String suggestName = String.format(SPILL_RATIO_NAME_FORMAT, 
JobCounters.CounterName.SPLIT_RAW_BYTES.getName());
+        suggestionResults.add(new 
MRTaskExecutionResponse.SuggestionResult(suggestName, deviation, suggestion));
+        return suggestionResults;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceGCFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceGCFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceGCFunc.java
new file mode 100644
index 0000000..61c8e12
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceGCFunc.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+
+public class ReduceGCFunc extends AbstractGCFunc {
+
+    public ReduceGCFunc() {
+        super(Constants.SuggestionType.ReduceGC);
+    }
+
+    public ReduceGCFunc(double threshold) {
+        super(Constants.SuggestionType.ReduceGC, threshold);
+    }
+
+    @Override
+    protected MRTaskExecutionResponse.TaskGroup 
getTasks(MRTaskExecutionResponse.TaskGroupResponse tasks) {
+        return 
tasks.tasksGroupByType.get(Constants.TaskType.REDUCE.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceInputFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceInputFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceInputFunc.java
new file mode 100644
index 0000000..2397eb3
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/ReduceInputFunc.java
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+
+public class ReduceInputFunc extends AbstractInputFunc {
+
+    public ReduceInputFunc() {
+        super(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES, 
Constants.SuggestionType.ReduceInput);
+    }
+
+    public ReduceInputFunc(double threshold) {
+        super(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES, 
Constants.SuggestionType.ReduceInput, threshold);
+    }
+
+    @Override
+    protected MRTaskExecutionResponse.TaskGroup 
getTasks(MRTaskExecutionResponse.TaskGroupResponse tasks) {
+        return 
tasks.tasksGroupByType.get(Constants.TaskType.REDUCE.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/SuggestionFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/SuggestionFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/SuggestionFunc.java
new file mode 100644
index 0000000..4a84d35
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/suggestion/SuggestionFunc.java
@@ -0,0 +1,27 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import 
org.apache.eagle.service.jpm.MRTaskExecutionResponse.JobSuggestionResponse;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse.TaskGroupResponse;
+
+public interface SuggestionFunc {
+
+    JobSuggestionResponse apply(TaskGroupResponse data);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
deleted file mode 100644
index c8d8869..0000000
--- 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import org.apache.eagle.common.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestJobCountPerBucketHelper {
-    MRJobCountHelper helper = new MRJobCountHelper();
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestJobCountPerBucketHelper.class);
-
-    @Test
-    public void test() throws ParseException {
-        String timeString = "2016-08-22 20:13:00";
-        long timestamp = DateTimeUtil.humanDateToSeconds(timeString);
-        String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp);
-        Assert.assertTrue(timeString2.equals(timeString));
-
-        String timeString3 = helper.moveTimeForwardOneDay(timeString);
-        Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00"));
-
-        String timeString4 = helper.moveTimeForwardOneDay(timeString3);
-        Assert.assertTrue(timeString4.equals("2016-08-20 20:13:00"));
-    }
-
-    @Test
-    public void test2() throws ParseException {
-        String startTime = "2016-08-22 20:13:00";
-        String endTime = "2016-08-22 24:13:00";
-        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new 
ArrayList<>();
-        helper.initJobCountList(jobCounts, 
DateTimeUtil.humanDateToSeconds(startTime), 
DateTimeUtil.humanDateToSeconds(endTime), 15 * 60);
-        /*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) {
-            LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket));
-        }*/
-        
Assert.assertTrue(DateTimeUtil.millisecondsToHumanDateWithSeconds(jobCounts.get(1).timeBucket).equals("2016-08-22
 20:15:00"));
-    }
-
-    @Test
-    public void test3() {
-        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new 
ArrayList<>();
-        long intervalSecs = 5;
-        helper.initJobCountList(jobCounts, 3, 31, intervalSecs);
-        helper.countJob(jobCounts, 5, 10, intervalSecs, "hive");
-        helper.countJob(jobCounts, 13, 18, intervalSecs, "hive");
-        helper.countJob(jobCounts, 18, 28, intervalSecs, "hive");
-        helper.countJob(jobCounts, 25, 33, intervalSecs, "hive");
-        Assert.assertTrue(jobCounts.size() == 7);
-        Assert.assertTrue(jobCounts.get(1).jobCount == 1);
-        Assert.assertTrue(jobCounts.get(5).jobCount == 2);
-    }
-
-    @Test
-    public void test4() throws ParseException {
-        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new 
ArrayList<>();
-        long intervalSecs = 60 * 15;
-        String startTime = "2016-08-22 20:13:00";
-        String endTime = "2016-08-22 24:13:00";
-        helper.initJobCountList(jobCounts, 
DateTimeUtil.humanDateToSeconds(startTime), 
DateTimeUtil.humanDateToSeconds(endTime), intervalSecs);
-        helper.countJob(jobCounts,
-                DateTimeUtil.humanDateToSeconds("2016-08-22 20:23:00"),
-                DateTimeUtil.humanDateToSeconds("2016-08-22 20:30:00"),
-                intervalSecs,
-                "hive");
-        Assert.assertTrue(jobCounts.get(2).jobCount == 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
deleted file mode 100644
index 2cd0b8e..0000000
--- 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
-import org.apache.eagle.jpm.util.Constants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestTaskCountPerJobHelper {
-    TaskCountByDurationHelper helper = new TaskCountByDurationHelper();
-
-    @Test
-    public void test() {
-        String timeList = " 0, 10,20,40 ";
-        List<Long> times = helper.parseTimeList(timeList);
-        Assert.assertTrue(times.size() == 4);
-
-        long val = 25 * 1000;
-        int index = helper.getPosition(times, val);
-        Assert.assertTrue(index == 2);
-    }
-
-    @Test
-    public void test2() {
-        TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity();
-        test1.setDuration(15 * 1000);
-        test1.setTaskStatus("running");
-        TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity();
-        test4.setDuration(13 * 1000);
-        test4.setTaskStatus("running");
-        TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity();
-        test2.setDuration(0 * 1000);
-        test2.setEndTime(100);
-        test2.setTaskStatus("x");
-        TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity();
-        test3.setDuration(19 * 1000);
-        test3.setTaskStatus("running");
-        TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity();
-        test5.setDuration(20 * 1000);
-        test5.setEndTime(28);
-        test5.setTaskStatus("x");
-        List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
-        tasks.add(test1);
-        tasks.add(test2);
-        tasks.add(test3);
-        tasks.add(test4);
-        tasks.add(test5);
-
-        List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new 
ArrayList<>();
-        List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new 
ArrayList<>();
-
-        String timeList = " 0, 10,20,40 ";
-        List<Long> times = helper.parseTimeList(timeList);
-
-        helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, 
new TaskCountByDurationHelper.RunningTaskComparator());
-
-        for (TaskExecutionAPIEntity o : tasks) {
-            int index = helper.getPosition(times, o.getDuration());
-            if 
(o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
-                MRJobTaskCountResponse.UnitTaskCount counter = 
runningTaskCount.get(index);
-                counter.taskCount++;
-                counter.entities.add(o);
-            } else if (o.getEndTime() != 0) {
-                MRJobTaskCountResponse.UnitTaskCount counter = 
finishedTaskCount.get(index);
-                counter.taskCount++;
-                counter.entities.add(o);
-            }
-        }
-        int top = 2;
-        if (top > 0)  {
-            helper.getTopTasks(runningTaskCount, top);
-        }
-        Assert.assertTrue(runningTaskCount.get(1).taskCount == 3);
-        Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRJobCountImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRJobCountImpl.java
 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRJobCountImpl.java
new file mode 100644
index 0000000..f21fb5c
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRJobCountImpl.java
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.count;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestMRJobCountImpl {
+
+    MRJobCountImpl helper = new MRJobCountImpl();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestMRJobCountImpl.class);
+
+    @Test
+    public void test() throws ParseException {
+        String timeString = "2016-08-22 20:13:00";
+        long timestamp = DateTimeUtil.humanDateToSeconds(timeString);
+        String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp);
+        Assert.assertTrue(timeString2.equals(timeString));
+
+        String timeString3 = helper.moveTimeForwardOneDay(timeString);
+        Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00"));
+
+        String timeString4 = helper.moveTimeForwardOneDay(timeString3);
+        Assert.assertTrue(timeString4.equals("2016-08-20 20:13:00"));
+    }
+
+    @Test
+    public void test2() throws ParseException {
+        String startTime = "2016-08-22 20:13:00";
+        String endTime = "2016-08-22 24:13:00";
+        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new 
ArrayList<>();
+        helper.initJobCountList(jobCounts, 
DateTimeUtil.humanDateToSeconds(startTime), 
DateTimeUtil.humanDateToSeconds(endTime), 15 * 60);
+        /*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) {
+            LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket));
+        }*/
+        
Assert.assertTrue(DateTimeUtil.millisecondsToHumanDateWithSeconds(jobCounts.get(1).timeBucket).equals("2016-08-22
 20:15:00"));
+    }
+
+    @Test
+    public void test3() {
+        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new 
ArrayList<>();
+        long intervalSecs = 5;
+        helper.initJobCountList(jobCounts, 3, 31, intervalSecs);
+        helper.countJob(jobCounts, 5, 10, intervalSecs, "hive");
+        helper.countJob(jobCounts, 13, 18, intervalSecs, "hive");
+        helper.countJob(jobCounts, 18, 28, intervalSecs, "hive");
+        helper.countJob(jobCounts, 25, 33, intervalSecs, "hive");
+        Assert.assertTrue(jobCounts.size() == 7);
+        Assert.assertTrue(jobCounts.get(1).jobCount == 1);
+        Assert.assertTrue(jobCounts.get(5).jobCount == 2);
+    }
+
+    @Test
+    public void test4() throws ParseException {
+        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new 
ArrayList<>();
+        long intervalSecs = 60 * 15;
+        String startTime = "2016-08-22 20:13:00";
+        String endTime = "2016-08-22 24:13:00";
+        helper.initJobCountList(jobCounts, 
DateTimeUtil.humanDateToSeconds(startTime), 
DateTimeUtil.humanDateToSeconds(endTime), intervalSecs);
+        helper.countJob(jobCounts,
+                DateTimeUtil.humanDateToSeconds("2016-08-22 20:23:00"),
+                DateTimeUtil.humanDateToSeconds("2016-08-22 20:30:00"),
+                intervalSecs,
+                "hive");
+        Assert.assertTrue(jobCounts.get(2).jobCount == 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRTaskCountImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRTaskCountImpl.java
 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRTaskCountImpl.java
new file mode 100644
index 0000000..50c5914
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/count/TestMRTaskCountImpl.java
@@ -0,0 +1,129 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.count;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse;
+import org.apache.eagle.service.jpm.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TestMRTaskCountImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestMRTaskCountImpl.class);
+    MRTaskCountImpl helper = new MRTaskCountImpl();
+
+    @Test
+    public void test() {
+        String timeList = " 0, 10,20,40 ";
+        List<Long> times = ResourceUtils.parseDistributionList(timeList);
+        Assert.assertTrue(times.size() == 4);
+
+        long val = 25;
+        int index = ResourceUtils.getDistributionPosition(times, val);
+        Assert.assertTrue(index == 2);
+    }
+
+    @Test
+    public void test2() {
+        TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity();
+        test1.setDuration(15 * 1000);
+        test1.setTaskStatus("running");
+        TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity();
+        test4.setDuration(13 * 1000);
+        test4.setTaskStatus("running");
+        TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity();
+        test2.setDuration(0 * 1000);
+        test2.setEndTime(100);
+        test2.setTaskStatus("x");
+        TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity();
+        test3.setDuration(19 * 1000);
+        test3.setTaskStatus("running");
+        TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity();
+        test5.setDuration(20 * 1000);
+        test5.setEndTime(28);
+        test5.setTaskStatus("x");
+        List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
+        tasks.add(test1);
+        tasks.add(test2);
+        tasks.add(test3);
+        tasks.add(test4);
+        tasks.add(test5);
+
+        List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new 
ArrayList<>();
+        List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new 
ArrayList<>();
+
+        String timeList = " 0, 10,20,40 ";
+        List<Long> times = ResourceUtils.parseDistributionList(timeList);
+
+        helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, 
new MRTaskCountImpl.RunningTaskComparator());
+
+        for (TaskExecutionAPIEntity o : tasks) {
+            int index = ResourceUtils.getDistributionPosition(times, 
o.getDuration() / DateTimeUtil.ONESECOND);
+            if 
(o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
+                MRJobTaskCountResponse.UnitTaskCount counter = 
runningTaskCount.get(index);
+                counter.taskCount++;
+                counter.entities.add(o);
+            } else if (o.getEndTime() != 0) {
+                MRJobTaskCountResponse.UnitTaskCount counter = 
finishedTaskCount.get(index);
+                counter.taskCount++;
+                counter.entities.add(o);
+            }
+        }
+        int top = 2;
+        helper.getTopTasks(runningTaskCount, top);
+        Assert.assertTrue(runningTaskCount.get(1).taskCount == 3);
+        Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2);
+    }
+
+    @Test
+    public void testCounterParametersParser() {
+        String paramString = "HDFS_BYTES_READ:[0,20.0,40], 
MAP_OUTPUT_BYTES:[20, 40], REDUCE_INPUT_RECORDS: [0, 199], 
REDUCE_OUTPUT_RECORDS: [3,10]";
+        String patternString = "(\\w+):\\s*\\[([\\d,\\.\\s]+)\\]";
+        Pattern pattern = Pattern.compile(patternString);
+        Matcher matcher = pattern.matcher(paramString);
+        List<String> expectedResults = new 
ArrayList<>(Arrays.asList("HDFS_BYTES_READ", "0,20.0,40", "MAP_OUTPUT_BYTES", 
"20, 40", "REDUCE_INPUT_RECORDS", "0, 199", "REDUCE_OUTPUT_RECORDS", "3,10"));
+        int i = 0;
+        while (matcher.find()) {
+            LOG.info(matcher.group(0));
+            LOG.info(matcher.group(1));
+            LOG.info(matcher.group(2));
+            
Assert.assertTrue(matcher.group(1).equals(expectedResults.get(i++)));
+            
Assert.assertTrue(matcher.group(2).equals(expectedResults.get(i++)));
+        }
+    }
+
+    @Test
+    public void testGetCounterName() {
+        JobCounters.CounterName counterName = 
JobCounters.CounterName.valueOf("HDFS_BYTES_READ");
+        
Assert.assertTrue(counterName.equals(JobCounters.CounterName.HDFS_BYTES_READ));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestDataSkewFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestDataSkewFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestDataSkewFunc.java
new file mode 100644
index 0000000..ce114fe
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestDataSkewFunc.java
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResource;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestDataSkewFunc {
+    MRTaskExecutionResource resource = new MRTaskExecutionResource();
+
+    public TaskExecutionAPIEntity createTestTask(String taskType, long 
duration, long hdfsReadByte, long reduceShuffleByte) {
+        TaskExecutionAPIEntity task = new TaskExecutionAPIEntity();
+        task.setDuration(duration);
+        task.setTags(new HashMap<String, String>());
+        task.getTags().put(MRJobTagName.TASK_TYPE.toString(), taskType);
+        JobCounters jobCounters = new JobCounters();
+        Map<String, Map<String, Long>> counters = new HashMap<>();
+        Map<String, Long> taskCounters = new HashMap<>();
+        taskCounters.put(JobCounters.CounterName.HDFS_BYTES_READ.getName(), 
hdfsReadByte);
+        
taskCounters.put(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES.getName(), 
reduceShuffleByte);
+
+        if (hdfsReadByte != 0) {
+            counters.put(JobCounters.GroupName.FileSystemCounters.getName(), 
taskCounters);
+        }
+        if (reduceShuffleByte != 0) {
+            counters.put(JobCounters.GroupName.MapReduceTaskCounter.getName(), 
taskCounters);
+        }
+        jobCounters.setCounters(counters);
+        task.setJobCounters(jobCounters);
+        return task;
+    }
+
+    @Test
+    public void testDataSkewFunc() {
+        MRTaskExecutionResponse.TaskGroupResponse response = new 
MRTaskExecutionResponse.TaskGroupResponse();
+        response.tasksGroupByType = new HashMap<>();
+        response.tasksGroupByType.put(Constants.TaskType.MAP.toString(), new 
MRTaskExecutionResponse.TaskGroup());
+        response.tasksGroupByType.put(Constants.TaskType.REDUCE.toString(), 
new MRTaskExecutionResponse.TaskGroup());
+
+        List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
+        tasks.add(createTestTask(Constants.TaskType.MAP.toString(), 9480, 327, 
12860L));
+        tasks.add(createTestTask(Constants.TaskType.MAP.toString(), 27619, 
379682, 379682));
+        tasks.add(createTestTask(Constants.TaskType.REDUCE.toString(), 19699, 
45, 908));
+        tasks.add(createTestTask(Constants.TaskType.REDUCE.toString(), 35688, 
45, 293797));
+        tasks.add(createTestTask(Constants.TaskType.REDUCE.toString(), 31929, 
45, 2062520));
+        MRTaskExecutionResponse.TaskGroupResponse taskGroup = 
resource.groupTasksByValue(response, true, tasks, 10000);
+
+        List<MRTaskExecutionResponse.JobSuggestionResponse> result = new 
ArrayList<>();
+
+        List<SuggestionFunc> suggestionFuncs = new ArrayList<>();
+        suggestionFuncs.add(new MapInputFunc());
+        suggestionFuncs.add(new ReduceInputFunc());
+        try {
+            for (SuggestionFunc func : suggestionFuncs) {
+                result.add(func.apply(taskGroup));
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+        Assert.assertTrue(result.size() > 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestTaskCounterFunc.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestTaskCounterFunc.java
 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestTaskCounterFunc.java
new file mode 100644
index 0000000..f88618a
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/suggestion/TestTaskCounterFunc.java
@@ -0,0 +1,88 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm.suggestion;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.service.jpm.MRTaskExecutionResource;
+import org.apache.eagle.service.jpm.MRTaskExecutionResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TestTaskCounterFunc {
+    MRTaskExecutionResource resource = new MRTaskExecutionResource();
+
+    public TaskExecutionAPIEntity createTestTask(String taskType, long 
duration, long gcMs, long cpuMs, long spillRecords, long mapOutput) {
+        TaskExecutionAPIEntity task = new TaskExecutionAPIEntity();
+        task.setDuration(duration);
+        task.setTags(new HashMap<>());
+        task.getTags().put(MRJobTagName.TASK_TYPE.toString(), taskType);
+        JobCounters jobCounters = new JobCounters();
+        Map<String, Map<String, Long>> counters = new HashMap<>();
+        Map<String, Long> taskCounters = new HashMap<>();
+        taskCounters.put(JobCounters.CounterName.GC_MILLISECONDS.getName(), 
gcMs);
+        taskCounters.put(JobCounters.CounterName.CPU_MILLISECONDS.getName(), 
cpuMs);
+        taskCounters.put(JobCounters.CounterName.SPLIT_RAW_BYTES.getName(), 
spillRecords);
+        taskCounters.put(JobCounters.CounterName.MAP_OUTPUT_RECORDS.getName(), 
mapOutput);
+
+        counters.put(JobCounters.GroupName.MapReduceTaskCounter.getName(), 
taskCounters);
+        jobCounters.setCounters(counters);
+        task.setJobCounters(jobCounters);
+        return task;
+    }
+
+    @Test
+    public void testTaskGroupByValue() {
+        MRTaskExecutionResponse.TaskGroupResponse response = new 
MRTaskExecutionResponse.TaskGroupResponse();
+        response.tasksGroupByType = new HashMap<>();
+        response.tasksGroupByType.put(Constants.TaskType.MAP.toString(), new 
MRTaskExecutionResponse.TaskGroup());
+        response.tasksGroupByType.put(Constants.TaskType.REDUCE.toString(), 
new MRTaskExecutionResponse.TaskGroup());
+
+        List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
+        tasks.add(createTestTask(Constants.TaskType.MAP.toString(), 2132259, 
300L, 12860L, 0L, 2091930L));
+        tasks.add(createTestTask(Constants.TaskType.MAP.toString(), 42071, 
800, 21010L, 0L, 2092547L));
+        tasks.add(createTestTask(Constants.TaskType.REDUCE.toString(), 19699, 
300, 3320, 0L, 0L));
+        MRTaskExecutionResponse.TaskGroupResponse taskGroup = 
resource.groupTasksByValue(response, true, tasks, 30000);
+
+        List<MRTaskExecutionResponse.JobSuggestionResponse> result = new 
ArrayList<>();
+
+        List<SuggestionFunc> suggestionFuncs = new ArrayList<>();
+        suggestionFuncs.add(new MapGCFunc());
+        suggestionFuncs.add(new ReduceGCFunc());
+        suggestionFuncs.add(new MapSpillFunc());
+        try {
+            for (SuggestionFunc func : suggestionFuncs) {
+                result.add(func.apply(taskGroup));
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+        Assert.assertTrue(result.size() > 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 7505cd1..393a97c 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.spark.history;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValue;
+import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -84,8 +85,11 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
         this.eagleInfo.username = 
config.getString("eagleProps.eagle.service.username");
         this.eagleInfo.password = 
config.getString("eagleProps.eagle.service.password");
         this.eagleInfo.timeout = 
config.getInt("eagleProps.eagle.service.read.timeout");
+        this.eagleInfo.basePath = EagleServiceBaseClient.DEFAULT_BASE_PATH;
+        if (config.hasPath("eagleProps.eagle.service.basePath")) {
+            this.eagleInfo.basePath = 
config.getString("eagleProps.eagle.service.basePath");
+        }
 
-        this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
         this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
         this.stormConfig.spoutCrawlInterval = 
config.getInt("storm.spoutCrawlInterval");
     }
@@ -113,7 +117,6 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
     }
 
     public static class StormConfig implements Serializable {
-        public int timeoutSec;
         public int spoutPending;
         public int spoutCrawlInterval;
     }
@@ -123,6 +126,7 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
         public int port;
         public String username;
         public String password;
+        public String basePath;
         public int timeout;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
index b4a403e..82b7607 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -699,6 +699,7 @@ public class JHFSparkEventReader {
     private EagleServiceBaseClient initiateClient() {
         client = new EagleServiceClientImpl(config.eagleInfo.host,
             config.eagleInfo.port,
+            config.eagleInfo.basePath,
             config.eagleInfo.username,
             config.eagleInfo.password);
         int timeout = config.eagleInfo.timeout;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
index b1dd09c..8c12633 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
@@ -47,14 +47,9 @@ public class JHFSparkParser implements JHFParserBase {
                 isValidJson = true;
                 JSONObject eventObj = parseAndValidateJSON(line);
                 if (isValidJson) {
-                    try {
-                        this.eventReader.read(eventObj);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
+                    this.eventReader.read(eventObj);
                 }
             }
-
             this.eventReader.clearReader();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 4f98996..2ce8522 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -70,9 +70,9 @@ public class JobHistoryZKStateManager {
     public List<String> loadApplications(int limit) {
         String jobPath = zkRoot + "/jobs";
         List<String> apps = new ArrayList<>();
-        InterProcessLock lock = new 
InterProcessReadWriteLock(curator,jobPath).writeLock();
+        //InterProcessLock lock = new 
InterProcessReadWriteLock(curator,jobPath).writeLock();
         try {
-            lock.acquire();
+            //lock.acquire();
             Iterator<String> iter = 
curator.getChildren().forPath(jobPath).iterator();
             while (iter.hasNext()) {
                 String appId = iter.next();
@@ -91,11 +91,11 @@ public class JobHistoryZKStateManager {
             LOG.error("fail to read unprocessed jobs", e);
             throw new RuntimeException(e);
         } finally {
-            try {
+           /* try {
                 lock.release();
             } catch (Exception e) {
                 LOG.error("fail to release lock", e);
-            }
+            }*/
 
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index 94f6bbf..9f8adc7 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -105,12 +105,12 @@ public class SparkHistoryJobParseBolt extends 
BaseRichBolt {
                 }
             }
 
-            zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
+            //zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
             LOG.info("Successfully parse application {}", appId);
             collector.ack(tuple);
         } catch (RuntimeException e) {
             LOG.warn("fail to process application {} due to RuntimeException, 
ignore it", appId, e);
-            zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
+            //zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
             collector.ack(tuple);
         } catch (Exception e) {
             LOG.error("Fail to process application {}, and retry", appId, e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index afc0b90..02c68b9 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -65,13 +65,13 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
     @SuppressWarnings("unchecked")
     @Override
     public void nextTuple() {
-        LOG.info("Start to run tuple");
+        //LOG.info("Start to run tuple");
         try {
             Calendar calendar = Calendar.getInstance();
             long fetchTime = calendar.getTimeInMillis();
             calendar.setTimeInMillis(this.lastFinishAppTime);
-            LOG.info("Last finished time = {}", calendar.getTime());
             if (fetchTime - this.lastFinishAppTime > 
this.config.stormConfig.spoutCrawlInterval) {
+                LOG.info("Last finished time = {}", calendar.getTime());
                 List<AppInfo> appInfos = 
rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, 
Long.toString(lastFinishAppTime));
                 if (appInfos != null) {
                     LOG.info("Get " + appInfos.size() + " from yarn resource 
manager.");
@@ -94,7 +94,7 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
             }
 
             if (appIds.isEmpty()) {
-                this.takeRest(10);
+                this.takeRest(5);
             } else {
                 LOG.info("{} apps sent.", appIds.size());
             }
@@ -107,7 +107,7 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
         try {
             Thread.sleep(seconds * 1000);
         } catch (InterruptedException e) {
-            LOG.warn("exception found {}", e);
+            LOG.warn("exception found", e);
         }
     }
 
@@ -119,15 +119,16 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
     @Override
     public void fail(Object msgId) {
         // Sleep 3 seconds and retry.
-        Utils.sleep(3000);
-
+        // Utils.sleep(3000);
         collector.emit(new Values(msgId), msgId);
-        zkState.updateApplicationStatus((String)msgId, 
ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+        zkState.updateApplicationStatus((String)msgId, 
ZKStateConstant.AppStatus.FAILED);
+        LOG.warn("fail {}", msgId.toString());
     }
 
     @Override
     public void ack(Object msgId) {
-
+        zkState.updateApplicationStatus((String)msgId, 
ZKStateConstant.AppStatus.FINISHED);
+        LOG.info("ack {}", msgId.toString());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 4b8887a..9f76da2 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -152,12 +152,6 @@
             <value>http://sandbox.hortonworks.com:8088</value>
         </property>
         <property>
-            <name>storm.messageTimeoutSec</name>
-            <displayName>messageTimeoutSec</displayName>
-            <description>Message timeout (in seconds)</description>
-            <value>3000</value>
-        </property>
-        <property>
             <name>storm.pendingSpout</name>
             <displayName>pendingSpout</displayName>
             <value>1000</value>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index baf559b..10da2d0 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -18,6 +18,7 @@
   "appId": "sparkHistoryJob",
   "mode": "CLUSTER",
   "workers" : 3,
+  topology.message.timeout.secs: 300,
   "basic":{
     "cluster":"sandbox",
     "dataCenter":"sandbox",
@@ -28,6 +29,7 @@
     eagle.service.port: 9099,
     eagle.service.username: "admin",
     eagle.service.password : "secret",
+    eagle.service.basePath : "/rest",
     eagle.service.read.timeout : 2
   },
   "zkStateConfig" : {
@@ -53,7 +55,6 @@
     }
   },
   "storm":{
-    "messageTimeoutSec": 3000,
     "pendingSpout": 1000,
     "spoutCrawlInterval": 10000,#in ms
     "parallelismConfig" : {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 8c561bd..73100a9 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -96,6 +96,10 @@ public class Constants {
         COMPLETE_MR_JOB
     }
 
+    public static enum SuggestionType {
+        MapInput, ReduceInput, MapSpill, MapGC, ReduceGC;
+    }
+
     public static final String TASK_RUNNING = "RUNNING";
     public static final String TASK_FINISHED = "FINISHED";
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
index b4bf600..8def44f 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
@@ -19,6 +19,7 @@
 package org.apache.eagle.jpm.util.jobcounter;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -45,4 +46,81 @@ public final class JobCounters implements Serializable {
         }
         counters.clear();
     }
+
+    public Long getCounterValue(CounterName counterName) {
+        return counters.get(counterName.group.name).get(counterName.name);
+    }
+
+    public static enum GroupName {
+        FileSystemCounters("org.apache.hadoop.mapreduce.FileSystemCounter", 
"FileSystemCounters"),
+        MapReduceTaskCounter("org.apache.hadoop.mapreduce.TaskCounter", 
"MapReduceTaskCounter"),
+        MapReduceJobCounter("org.apache.hadoop.mapreduce.JobCounter", 
"MapReduceJobCounter");
+
+        private String name;
+        private String displayName;
+
+        GroupName(String name, String displayName) {
+            this.name = name;
+            this.displayName = displayName;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getDisplayName() {
+            return displayName;
+        }
+    }
+
+    public static enum CounterName {
+
+        FILE_BYTES_READ(GroupName.FileSystemCounters, "FILE_BYTES_READ", 
"FILE_BYTES_READ"),
+        FILE_BYTES_WRITTEN(GroupName.FileSystemCounters, "FILE_BYTES_WRITTEN", 
"FILE_BYTES_WRITTEN"),
+        HDFS_BYTES_READ(GroupName.FileSystemCounters, "HDFS_BYTES_READ", 
"HDFS_BYTES_READ"),
+        HDFS_BYTES_WRITTEN(GroupName.FileSystemCounters, "HDFS_BYTES_WRITTEN", 
"HDFS_BYTES_WRITTEN"),
+
+        MAP_INPUT_RECORDS(GroupName.MapReduceTaskCounter, "MAP_INPUT_RECORDS", 
"Map input records"),
+        MAP_OUTPUT_RECORDS(GroupName.MapReduceTaskCounter, 
"MAP_OUTPUT_RECORDS", "Map output records"),
+        MAP_OUTPUT_BYTES(GroupName.MapReduceTaskCounter, "MAP_OUTPUT_BYTES", 
"Map output bytes"),
+        MAP_OUTPUT_MATERIALIZED_BYTES(GroupName.MapReduceTaskCounter, 
"MAP_OUTPUT_MATERIALIZED_BYTES", "Map output materialized bytes"),
+        SPLIT_RAW_BYTES(GroupName.MapReduceTaskCounter, "SPLIT_RAW_BYTES", 
"SPLIT_RAW_BYTES"),
+
+        REDUCE_INPUT_GROUPS(GroupName.MapReduceTaskCounter, 
"REDUCE_INPUT_GROUPS", "Reduce input groups"),
+        REDUCE_SHUFFLE_BYTES(GroupName.MapReduceTaskCounter, 
"REDUCE_SHUFFLE_BYTES", "Reduce shuffle bytes"),
+        REDUCE_OUTPUT_RECORDS(GroupName.MapReduceTaskCounter, 
"REDUCE_OUTPUT_RECORDS", "Reduce output records"),
+        REDUCE_INPUT_RECORDS(GroupName.MapReduceTaskCounter, 
"REDUCE_INPUT_RECORDS", "Reduce input records"),
+
+        COMBINE_INPUT_RECORDS(GroupName.MapReduceTaskCounter, 
"COMBINE_INPUT_RECORDS", "Combine input records"),
+        COMBINE_OUTPUT_RECORDS(GroupName.MapReduceTaskCounter, 
"COMBINE_OUTPUT_RECORDS", "Combine output records"),
+        SPILLED_RECORDS(GroupName.MapReduceTaskCounter, "SPILLED_RECORDS", 
"Spilled Records"),
+
+        CPU_MILLISECONDS(GroupName.MapReduceTaskCounter, "CPU_MILLISECONDS", 
"CPU time spent (ms)"),
+        GC_MILLISECONDS(GroupName.MapReduceTaskCounter, "GC_TIME_MILLIS", "GC 
time elapsed (ms)"),
+        COMMITTED_HEAP_BYTES(GroupName.MapReduceTaskCounter, 
"COMMITTED_HEAP_BYTES", "Total committed heap usage (bytes)"),
+        PHYSICAL_MEMORY_BYTES(GroupName.MapReduceTaskCounter, 
"PHYSICAL_MEMORY_BYTES", "Physical memory (bytes) snapshot"),
+        VIRTUAL_MEMORY_BYTES(GroupName.MapReduceTaskCounter, 
"VIRTUAL_MEMORY_BYTES", "Virtual memory (bytes) snapshot");
+
+        private GroupName group;
+        private String name;
+        private String displayName;
+
+        CounterName(GroupName group, String name, String displayName) {
+            this.group = group;
+            this.name = name;
+            this.displayName = displayName;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        public String getGroupName() {
+            return group.name();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cfd5c7f8/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index bae8ea2..0413b34 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -156,9 +156,9 @@
                 <activeByDefault>false</activeByDefault>
             </activation>
 
-        <!-- 
=========================================================================================================
-             TODO: Decouple following specific application related 
dependencies into independent module: eagle-develop
-             
=========================================================================================================
 -->
+            <!-- 
=========================================================================================================
+                 TODO: Decouple following specific application related 
dependencies into independent module: eagle-develop
+                 
=========================================================================================================
 -->
 
             <dependencies>
                 <!-- App: Oozie  Auditlog monitoring -->
@@ -208,6 +208,11 @@
                     <artifactId>eagle-jpm-mr-running</artifactId>
                     <version>${project.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.eagle</groupId>
+                    <artifactId>eagle-jpm-service</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>

Reply via email to