APACHE-KYLIN-2723: collect job related metrics

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/83d74c2e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/83d74c2e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/83d74c2e

Branch: refs/heads/yaho-cube-planner
Commit: 83d74c2e1fe214c0a612be20591917f11e126126
Parents: 594087a
Author: Zhong <nju_y...@apache.org>
Authored: Thu Aug 10 17:47:32 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Thu Aug 10 17:47:32 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  9 ++
 .../kylin/job/common/ShellExecutable.java       |  7 +-
 .../kylin/job/exception/ShellException.java     | 43 +++++++++
 .../job/execution/DefaultChainedExecutable.java | 18 ++--
 .../kylin/job/execution/ExecuteResult.java      | 30 ++++++
 .../metrics/job/JobRecordEventWrapper.java      | 23 +----
 engine-mr/pom.xml                               |  4 +
 .../org/apache/kylin/engine/mr/CubingJob.java   | 96 +++++++++++++++++++-
 .../engine/mr/common/HadoopShellExecutable.java |  8 +-
 .../engine/mr/common/MapReduceExecutable.java   |  9 +-
 .../mr/exception/HadoopShellException.java      | 44 +++++++++
 .../engine/mr/exception/MapReduceException.java | 43 +++++++++
 .../mr/exception/SegmentNotFoundException.java  | 44 +++++++++
 .../engine/mr/steps/MergeDictionaryStep.java    |  2 +-
 .../engine/mr/steps/MergeStatisticsStep.java    |  2 +-
 .../engine/mr/steps/SaveStatisticsStep.java     |  2 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  2 +-
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java  |  9 +-
 .../kylin/engine/spark/SparkExecutable.java     |  2 +-
 .../source/hive/CreateFlatHiveTableStep.java    |  2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |  8 +-
 .../hive/exception/SegmentEmptyException.java   | 44 +++++++++
 .../org/apache/kylin/source/jdbc/CmdStep.java   |  2 +-
 .../apache/kylin/source/jdbc/HiveCmdStep.java   |  2 +-
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../kylin/source/kafka/job/MergeOffsetStep.java |  2 +-
 .../storage/hbase/steps/DeprecatedGCStep.java   |  2 +-
 .../steps/HDFSPathGarbageCollectionStep.java    |  2 +-
 .../kylin/storage/hbase/steps/MergeGCStep.java  |  2 +-
 29 files changed, 409 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 61bef17..8df97ad 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1146,6 +1146,15 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getDeployEnv();
     }
 
+    public String getKylinMetricsSubjectJob() {
+        return getOptional("kylin.core.metrics.subject-job", "METRICS_JOB") + 
"_" + getKylinMetricsSubjectSuffix();
+    }
+
+    public String getKylinMetricsSubjectJobException() {
+        return getOptional("kylin.core.metrics.subject-job-exception", 
"METRICS_JOB_EXCEPTION") + "_"
+                + getKylinMetricsSubjectSuffix();
+    }
+
     public String getKylinMetricsSubjectQuery() {
         return getOptional("kylin.core.metrics.subject-query", 
"METRICS_QUERY") + "_" + getKylinMetricsSubjectSuffix();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 9f431b0..070ac23 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -19,8 +19,10 @@
 package org.apache.kylin.job.common;
 
 import java.io.IOException;
+
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.ShellException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
@@ -45,10 +47,11 @@ public class ShellExecutable extends AbstractExecutable {
             final PatternedLogger patternedLogger = new 
PatternedLogger(logger);
             final Pair<Integer, String> result = 
context.getConfig().getCliCommandExecutor().execute(getCmd(), patternedLogger);
             getManager().addJobInfo(getId(), patternedLogger.getInfo());
-            return new ExecuteResult(result.getFirst() == 0 ? 
ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
+            return new ExecuteResult(result.getFirst() == 0 ? 
ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED,
+                    new ShellException(result.getSecond()));
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java 
b/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
new file mode 100644
index 0000000..443bd9b
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/exception/ShellException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.exception;
+
+/**
+ */
+public class ShellException extends Exception {
+
+    public ShellException() {
+    }
+
+    public ShellException(String message) {
+        super(message);
+    }
+
+    public ShellException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ShellException(Throwable cause) {
+        super(cause);
+    }
+
+    public ShellException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 3efa5a8..0ade541 100755
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -65,7 +65,7 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
                 return subTask.execute(context);
             }
         }
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
+        return new ExecuteResult(ExecuteResult.State.SUCCEED);
     }
 
     @Override
@@ -83,7 +83,7 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
     @Override
     protected void onExecuteError(Throwable exception, ExecutableContext 
executableContext) {
         super.onExecuteError(exception, executableContext);
-        notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+        onStatusChange(executableContext, new ExecuteResult(exception), 
ExecutableState.ERROR);
     }
 
     @Override
@@ -92,10 +92,10 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
 
         if (isDiscarded()) {
             setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(executableContext, 
ExecutableState.DISCARDED);
+            onStatusChange(executableContext, result, 
ExecutableState.DISCARDED);
         } else if (isPaused()) {
             setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(executableContext, ExecutableState.STOPPED);
+            onStatusChange(executableContext, result, ExecutableState.STOPPED);
         } else if (result.succeed()) {
             List<? extends Executable> jobs = getTasks();
             boolean allSucceed = true;
@@ -120,11 +120,11 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
                 mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, 
null);
-                notifyUserStatusChange(executableContext, 
ExecutableState.SUCCEED);
+                onStatusChange(executableContext, result, 
ExecutableState.SUCCEED);
             } else if (hasError) {
                 setEndTime(System.currentTimeMillis());
                 mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, 
null);
-                notifyUserStatusChange(executableContext, 
ExecutableState.ERROR);
+                onStatusChange(executableContext, result, 
ExecutableState.ERROR);
             } else if (hasRunning) {
                 mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, 
null);
             } else if (hasDiscarded) {
@@ -136,10 +136,14 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
         } else {
             setEndTime(System.currentTimeMillis());
             mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, 
result.output());
-            notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+            onStatusChange(executableContext, result, ExecutableState.ERROR);
         }
     }
 
+    protected void onStatusChange(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
+        notifyUserStatusChange(context, state);
+    }
+
     @Override
     public List<AbstractExecutable> getTasks() {
         return subTasks;

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index 760a574..139c04b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -30,6 +30,14 @@ public final class ExecuteResult {
 
     private final State state;
     private final String output;
+    private final Throwable throwable;
+
+    /**
+     * Default constructor to indicate a success ExecuteResult.
+     */
+    public ExecuteResult() {
+        this(State.SUCCEED, "succeed");
+    }
 
     public ExecuteResult(State state) {
         this(state, "");
@@ -39,6 +47,24 @@ public final class ExecuteResult {
         Preconditions.checkArgument(state != null, "state cannot be null");
         this.state = state;
         this.output = output;
+        this.throwable = null;
+    }
+
+    public ExecuteResult(State state, Throwable throwable) {
+        Preconditions.checkArgument(state != null, "state cannot be null");
+        this.state = state;
+        this.throwable = throwable;
+        this.output = throwable.getMessage();
+    }
+
+    public ExecuteResult(Throwable throwable) {
+        this(throwable, throwable.getMessage());
+    }
+
+    public ExecuteResult(Throwable throwable, String output) {
+        this.state = State.ERROR;
+        this.throwable = throwable;
+        this.output = output;
     }
 
     public State state() {
@@ -52,4 +78,8 @@ public final class ExecuteResult {
     public String output() {
         return output;
     }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
index 537388c..6cd197e 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
@@ -39,7 +39,7 @@ public class JobRecordEventWrapper extends RecordEventWrapper 
{
         
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 0L);
         
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
0L);
         
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
0L);
-        setDependentStats();
+        this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 
0L);
     }
 
     public void setWrapper(String projectName, String cubeName, String jobId, 
String jobType, String cubingType) {
@@ -50,12 +50,13 @@ public class JobRecordEventWrapper extends 
RecordEventWrapper {
         this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), 
cubingType);
     }
 
-    public void setStats(long tableSize, long cubeSize, long buildDuration, 
long waitResourceTime) {
+    public void setStats(long tableSize, long cubeSize, long buildDuration, 
long waitResourceTime,
+            double perBytesTimeCost) {
         this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 
tableSize);
         this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
         this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 
buildDuration);
         this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 
waitResourceTime);
-        setDependentStats();
+        this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 
perBytesTimeCost);
     }
 
     public void setStepStats(long dColumnDistinct, long dDictBuilding, long 
dCubingInmem, long dHfileConvert) {
@@ -64,20 +65,4 @@ public class JobRecordEventWrapper extends 
RecordEventWrapper {
         
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
dCubingInmem);
         
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
dHfileConvert);
     }
-
-    private void setDependentStats() {
-        Long sourceSize = (Long) 
this.metricsEvent.get(JobPropertyEnum.SOURCE_SIZE.toString());
-        if (sourceSize != null && sourceSize != 0) {
-            if (sourceSize < MIN_SOURCE_SIZE) {
-                sourceSize = MIN_SOURCE_SIZE;
-            }
-            
this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(),
-                    ((Long) 
this.metricsEvent.get(JobPropertyEnum.BUILD_DURATION.toString())
-                            - (Long) 
this.metricsEvent.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString())) * 1.0
-                            / sourceSize);
-        } else {
-            
this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0.0);
-        }
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index e3aaa05..187d224 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -50,6 +50,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-storage</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 466f706..c4b6e12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -48,9 +48,15 @@ import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metrics.MetricsManager;
+import org.apache.kylin.metrics.job.ExceptionRecordEventWrapper;
+import org.apache.kylin.metrics.job.JobRecordEventWrapper;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+
 /**
  */
 public class CubingJob extends DefaultChainedExecutable {
@@ -61,6 +67,35 @@ public class CubingJob extends DefaultChainedExecutable {
         LAYER, INMEM
     }
 
+    public enum CubingJobTypeEnum {
+        BUILD("BUILD"), MERGE("MERGE");
+
+        private final String name;
+
+        CubingJobTypeEnum(String name) {
+            this.name = name;
+        }
+
+        public String toString() {
+            return name;
+        }
+
+        public static CubingJobTypeEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (CubingJobTypeEnum jobTypeEnum : CubingJobTypeEnum.values()) {
+                if (jobTypeEnum.name.equals(name.toUpperCase())) {
+                    return jobTypeEnum;
+                }
+            }
+            return null;
+        }
+    }
+
+    //32MB per block created by the first step
+    public static final long MIN_SOURCE_SIZE = 33554432L;
+
     // KEYS of Output.extraInfo map, info passed across job steps
     public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
     public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
@@ -68,13 +103,14 @@ public class CubingJob extends DefaultChainedExecutable {
     public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
     private static final String DEPLOY_ENV_NAME = "envName";
     private static final String PROJECT_INSTANCE_NAME = "projectName";
+    private static final String JOB_TYPE = "jobType";
 
     public static CubingJob createBuildJob(CubeSegment seg, String submitter, 
JobEngineConfig config) {
-        return initCubingJob(seg, "BUILD", submitter, config);
+        return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), 
submitter, config);
     }
 
     public static CubingJob createMergeJob(CubeSegment seg, String submitter, 
JobEngineConfig config) {
-        return initCubingJob(seg, "MERGE", submitter, config);
+        return initCubingJob(seg, CubingJobTypeEnum.MERGE.toString(), 
submitter, config);
     }
 
     private static CubingJob initCubingJob(CubeSegment seg, String jobType, 
String submitter, JobEngineConfig config) {
@@ -99,6 +135,7 @@ public class CubingJob extends DefaultChainedExecutable {
         format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
         result.setDeployEnvName(kylinConfig.getDeployEnv());
         result.setProjectName(projList.get(0).getName());
+        result.setJobType(jobType);
         CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), 
result.getParams());
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() 
+ " - " + seg.getName() + " - "
@@ -128,6 +165,14 @@ public class CubingJob extends DefaultChainedExecutable {
         return getParam(PROJECT_INSTANCE_NAME);
     }
 
+    public String getJobType() {
+        return getParam(JOB_TYPE);
+    }
+
+    void setJobType(String jobType) {
+        setParam(JOB_TYPE, jobType);
+    }
+
     @Override
     protected Pair<String, String> formatNotifications(ExecutableContext 
context, ExecutableState state) {
         CubeInstance cubeInstance = 
CubeManager.getInstance(context.getConfig())
@@ -197,6 +242,53 @@ public class CubingJob extends DefaultChainedExecutable {
         super.onExecuteFinished(result, executableContext);
     }
 
+    protected void onStatusChange(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
+        super.onStatusChange(context, result, state);
+
+        /**
+         * report job related metrics
+         */
+        if (state == ExecutableState.SUCCEED) {
+            JobRecordEventWrapper jobRecordEventWrapper = new 
JobRecordEventWrapper(
+                    new 
RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()));
+            
jobRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+                    CubingExecutableUtil.getCubeName(getParams()), getId(), 
getJobType(),
+                    getAlgorithm() == null ? "NULL" : 
getAlgorithm().toString());
+            long tableSize = findSourceSizeBytes();
+            long buildDuration = getDuration();
+            long waitResourceTime = getMapReduceWaitTime();
+            jobRecordEventWrapper.setStats(tableSize, findCubeSizeBytes(), 
buildDuration, waitResourceTime,
+                    getPerBytesTimeCost(tableSize, buildDuration - 
waitResourceTime));
+            if (CubingJobTypeEnum.getByName(getJobType()) == 
CubingJobTypeEnum.BUILD) {
+                jobRecordEventWrapper.setStepStats(
+                        
getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(),
 //
+                        
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(), //
+                        
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(), //
+                        
getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
+            }
+            
MetricsManager.getInstance().update(jobRecordEventWrapper.getMetricsRecord());
+        } else if (state == ExecutableState.ERROR) {
+            ExceptionRecordEventWrapper exceptionRecordEventWrapper = new 
ExceptionRecordEventWrapper(
+                    new 
RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()));
+            
exceptionRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
+                    CubingExecutableUtil.getCubeName(getParams()), getId(), 
getJobType(),
+                    getAlgorithm() == null ? "NULL" : 
getAlgorithm().toString(),
+                    result.getThrowable() != null ? 
result.getThrowable().getClass() : Exception.class);
+            
MetricsManager.getInstance().update(exceptionRecordEventWrapper.getMetricsRecord());
+        }
+
+    }
+
+    private double getPerBytesTimeCost(long size, long timeCost) {
+        if (size <= 0) {
+            return 0;
+        }
+        if (size < MIN_SOURCE_SIZE) {
+            size = MIN_SOURCE_SIZE;
+        }
+        return timeCost * 1.0 / size;
+    }
+
     /**
      * build fail because the metadata store has problem.
      * @param exception

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
index ce19500..ddbcc99 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
 
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.engine.mr.exception.HadoopShellException;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -68,13 +69,14 @@ public class HadoopShellExecutable extends 
AbstractExecutable {
                 result = 2;
             }
             log.append("result code:").append(result);
-            return result == 0 ? new 
ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new 
ExecuteResult(ExecuteResult.State.FAILED, log.toString());
+            return result == 0 ? new 
ExecuteResult(ExecuteResult.State.SUCCEED, log.toString())
+                    : new ExecuteResult(ExecuteResult.State.FAILED, new 
HadoopShellException(log.toString()));
         } catch (ReflectiveOperationException e) {
             logger.error("error getMapReduceJobClass, class name:" + 
getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         } catch (Exception e) {
             logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 07efb34..a7363be 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.exception.MapReduceException;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -131,7 +132,7 @@ public class MapReduceExecutable extends AbstractExecutable 
{
                     ex.printStackTrace(new PrintWriter(stringWriter));
                     log.append(stringWriter.toString()).append("\n");
                     log.append("result code:").append(2);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, 
log.toString());
+                    return new ExecuteResult(ex, log.toString());
                 }
                 job = hadoopJob.getJob();
             }
@@ -168,7 +169,7 @@ public class MapReduceExecutable extends AbstractExecutable 
{
                     if (status == JobStepStatusEnum.FINISHED) {
                         return new ExecuteResult(ExecuteResult.State.SUCCEED, 
output.toString());
                     } else {
-                        return new ExecuteResult(ExecuteResult.State.FAILED, 
output.toString());
+                        return new ExecuteResult(ExecuteResult.State.FAILED, 
new MapReduceException(output.toString()));
                     }
                 }
                 
Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000L);
@@ -191,10 +192,10 @@ public class MapReduceExecutable extends 
AbstractExecutable {
 
         } catch (ReflectiveOperationException e) {
             logger.error("error getMapReduceJobClass, class name:" + 
getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         } catch (Exception e) {
             logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
new file mode 100644
index 0000000..23d4a3a
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/HadoopShellException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class HadoopShellException extends Exception {
+
+    public HadoopShellException() {
+    }
+
+    public HadoopShellException(String message) {
+        super(message);
+    }
+
+    public HadoopShellException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public HadoopShellException(Throwable cause) {
+        super(cause);
+    }
+
+    public HadoopShellException(String message, Throwable cause, boolean 
enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
new file mode 100644
index 0000000..fc047fe
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/MapReduceException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class MapReduceException extends Exception {
+
+    public MapReduceException() {
+    }
+
+    public MapReduceException(String message) {
+        super(message);
+    }
+
+    public MapReduceException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MapReduceException(Throwable cause) {
+        super(cause);
+    }
+
+    public MapReduceException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
new file mode 100644
index 0000000..3e8b59e
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/exception/SegmentNotFoundException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.exception;
+
+/**
+ */
+public class SegmentNotFoundException extends Exception {
+
+    public SegmentNotFoundException() {
+    }
+
+    public SegmentNotFoundException(String message) {
+        super(message);
+    }
+
+    public SegmentNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SegmentNotFoundException(Throwable cause) {
+        super(cause);
+    }
+
+    public SegmentNotFoundException(String message, Throwable cause, boolean 
enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 4ca132c..bc1e638 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -74,7 +74,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 04d8231..ccebbb2 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -136,7 +136,7 @@ public class MergeStatisticsStep extends AbstractExecutable 
{
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to merge cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 28f99fb..7544188 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -80,7 +80,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to save cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 2efd718..f44520f 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -75,7 +75,7 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index add5c42..86e6080 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -51,7 +52,8 @@ public class UpdateCubeInfoAfterMergeStep extends 
AbstractExecutable {
 
         CubeSegment mergedSegment = 
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
         if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no 
segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
+            return new ExecuteResult(ExecuteResult.State.FAILED, new 
SegmentNotFoundException(
+                    "there is no segment with id:" + 
CubingExecutableUtil.getSegmentId(this.getParams())));
         }
 
         CubingJob cubingJob = (CubingJob) 
getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
@@ -60,7 +62,8 @@ public class UpdateCubeInfoAfterMergeStep extends 
AbstractExecutable {
         // collect source statistics
         List<String> mergingSegmentIds = 
CubingExecutableUtil.getMergingSegmentIds(this.getParams());
         if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no 
merging segments");
+            return new ExecuteResult(ExecuteResult.State.FAILED,
+                    new SegmentNotFoundException("there are no merging 
segments"));
         }
         long sourceCount = 0L;
         long sourceSize = 0L;
@@ -82,7 +85,7 @@ public class UpdateCubeInfoAfterMergeStep extends 
AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED);
         } catch (IOException e) {
             logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1e032c6..f3d07a2 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -131,7 +131,7 @@ public class SparkExecutable extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, 
patternedLogger.getBufferedLog());
         } catch (Exception e) {
             logger.error("error run spark job:", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index dc85c52..476068b 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -73,7 +73,7 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
 
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+            return new ExecuteResult(e, stepLogger.getBufferedLog());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4747cb9..8a4af9b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -53,6 +53,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.hive.exception.SegmentEmptyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -318,7 +319,8 @@ public class HiveMRInput implements IMRInput {
                 if (rowCount == 0) {
                     if (!config.isEmptySegmentAllowed()) {
                         stepLogger.log("Detect upstream hive table is empty, " 
+ "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
-                        return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+                        return new ExecuteResult(ExecuteResult.State.ERROR,
+                                new 
SegmentEmptyException(stepLogger.getBufferedLog()));
                     } else {
                         return new ExecuteResult(ExecuteResult.State.SUCCEED, 
"Row count is 0, no need to redistribute");
                     }
@@ -339,7 +341,7 @@ public class HiveMRInput implements IMRInput {
 
             } catch (Exception e) {
                 logger.error("job:" + getId() + " execute finished with 
exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+                return new ExecuteResult(e, stepLogger.getBufferedLog());
             }
         }
 
@@ -381,7 +383,7 @@ public class HiveMRInput implements IMRInput {
                 //output.append(cleanUpHiveViewIntermediateTable(config));
             } catch (IOException e) {
                 logger.error("job:" + getId() + " execute finished with 
exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getMessage());
+                return new ExecuteResult(e, e.getMessage());
             }
 
             return new ExecuteResult(ExecuteResult.State.SUCCEED, 
output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
new file mode 100644
index 0000000..10a11c0
--- /dev/null
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/exception/SegmentEmptyException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive.exception;
+
+/**
+ */
+public class SegmentEmptyException extends Exception {
+
+    public SegmentEmptyException() {
+    }
+
+    public SegmentEmptyException(String message) {
+        super(message);
+    }
+
+    public SegmentEmptyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SegmentEmptyException(Throwable cause) {
+        super(cause);
+    }
+
+    public SegmentEmptyException(String message, Throwable cause, boolean 
enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
index 63593c0..4025457 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
@@ -62,7 +62,7 @@ public class CmdStep extends AbstractExecutable {
 
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+            return new ExecuteResult(e, stepLogger.getBufferedLog());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
index 8a6c90f..a67dc37 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
@@ -62,7 +62,7 @@ public class HiveCmdStep extends AbstractExecutable {
 
         } catch (Exception e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+            return new ExecuteResult(e, stepLogger.getBufferedLog());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 5bce4e7..c6914c3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -222,7 +222,7 @@ public class KafkaMRInput implements IMRInput {
                 rmdirOnHDFS(getDataPath());
             } catch (IOException e) {
                 logger.error("job:" + getId() + " execute finished with 
exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getMessage());
+                return new ExecuteResult(e, e.getMessage());
             }
 
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " 
+ getDataPath() + " is dropped.\n");

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 914fca2..2f5e492 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@ -78,7 +78,7 @@ public class MergeOffsetStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
             logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+            return new ExecuteResult(e, e.getLocalizedMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index df3cf08..c454e60 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -78,7 +78,7 @@ public class DeprecatedGCStep extends AbstractExecutable {
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);
             output.append("\n").append(e.getLocalizedMessage());
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
output.toString());
+            return new ExecuteResult(e, output.toString());
         }
 
         return new ExecuteResult(ExecuteResult.State.SUCCEED, 
output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 86e8e6b..b9b5fe6 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -67,7 +67,7 @@ public class HDFSPathGarbageCollectionStep extends 
AbstractExecutable {
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);
             output.append("\n").append(e.getLocalizedMessage());
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
output.toString());
+            return new ExecuteResult(e, output.toString());
         }
 
         return new ExecuteResult(ExecuteResult.State.SUCCEED, 
output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/83d74c2e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
index 2f7e164..7371e08 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -95,7 +95,7 @@ public class MergeGCStep extends AbstractExecutable {
             } catch (IOException e) {
                 output.append("Got error when drop HBase table, exiting... 
\n");
                 // This should not block the merge job; Orphans should be 
cleaned up in StorageCleanupJob
-                return new ExecuteResult(ExecuteResult.State.ERROR, 
output.append(e.getLocalizedMessage()).toString());
+                return new ExecuteResult(e, 
output.append(e.getLocalizedMessage()).toString());
             } finally {
                 if (admin != null)
                     try {

Reply via email to