shaofengshi closed pull request #311: KYLIN-3647 Fix inconsistent states of job 
and its sub-task
URL: https://github.com/apache/kylin/pull/311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index ad22abc322..b8d3144acc 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -99,7 +99,7 @@ private void onExecuteFinishedWithRetry(ExecuteResult result, 
ExecutableContext
                 onExecuteFinished(result, executableContext);
             } catch (Exception e) {
                 logger.error(nRetry + "th retries for onExecuteFinished fails 
due to {}", e);
-                if (isMetaDataPersistException(e)) {
+                if (isMetaDataPersistException(e, 5)) {
                     exception = e;
                     try {
                         Thread.sleep(1000L * (long) Math.pow(4, nRetry));
@@ -211,14 +211,21 @@ protected void 
handleMetadataPersistException(ExecutableContext context, Throwab
         new MailService(context.getConfig()).sendMail(users, title, content);
     }
 
-    private boolean isMetaDataPersistException(Exception e) {
+    protected abstract ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException, PersistentException;
+
+    @Override
+    public void cleanup() throws ExecuteException {
+
+    }
+
+    public static boolean isMetaDataPersistException(Exception e, final int 
maxDepth) {
         if (e instanceof PersistentException) {
             return true;
         }
 
         Throwable t = e.getCause();
         int depth = 0;
-        while (t != null && depth < 5) {
+        while (t != null && depth < maxDepth) {
             depth++;
             if (t instanceof PersistentException) {
                 return true;
@@ -228,13 +235,6 @@ private boolean isMetaDataPersistException(Exception e) {
         return false;
     }
 
-    protected abstract ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException;
-
-    @Override
-    public void cleanup() throws ExecuteException {
-
-    }
-
     @Override
     public boolean isRunnable() {
         return this.getStatus() == ExecutableState.READY;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index ec660fd28b..b912ecc190 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -112,9 +112,9 @@ protected void onExecuteFinished(ExecuteResult result, 
ExecutableContext executa
                             "There shouldn't be a running subtask[jobId: {}, 
jobName: {}], \n"
                                     + "it might cause endless state, will 
retry to fetch subtask's state.",
                             task.getId(), task.getName());
-                    boolean retryRet = retryFetchTaskStatus(task);
-                    if (false == retryRet)
-                        hasError = true;
+                    getManager().updateJobOutput(task.getId(), 
ExecutableState.ERROR, null,
+                            "killed due to inconsistent state");
+                    hasError = true;
                 }
 
                 final ExecutableState status = task.getStatus();
@@ -175,34 +175,6 @@ public void addTask(AbstractExecutable executable) {
         this.subTasks.add(executable);
     }
 
-    private boolean retryFetchTaskStatus(Executable task) {
-        boolean hasRunning = false;
-        int retry = 1;
-        while (retry <= 10) {
-            ExecutableState retryState = task.getStatus();
-            if (retryState == ExecutableState.RUNNING) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    logger.error("Failed to Sleep: ", e);
-                }
-                hasRunning = true;
-                logger.error("With {} times retry, it's state is still 
RUNNING", retry);
-            } else {
-                logger.info("With {} times retry, status is changed to: {}", 
retry, retryState);
-                hasRunning = false;
-                break;
-            }
-            retry++;
-        }
-        if (hasRunning) {
-            logger.error("Parent task: {} is finished, but it's subtask: {}'s 
state is still RUNNING \n"
-                    + ", mark parent task failed.", getName(), task.getName());
-            return false;
-        }
-        return true;
-    }
-
     @Override
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 5cc8a0f7d7..09b7b8e4bf 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -442,7 +442,6 @@ public void updateJobOutput(String jobId, ExecutableState 
newStatus, Map<String,
     public void forceKillJob(String jobId) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
-            jobOutput.setStatus(ExecutableState.ERROR.toString());
             List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks();
 
             for (ExecutablePO task : tasks) {
@@ -453,12 +452,31 @@ public void forceKillJob(String jobId) {
                 }
                 break;
             }
-            executableDao.updateJobOutput(jobOutput);
+
+            if 
(!jobOutput.getStatus().equals(ExecutableState.ERROR.toString())) {
+                jobOutput.setStatus(ExecutableState.ERROR.toString());
+                executableDao.updateJobOutput(jobOutput);
+            }
         } catch (PersistentException e) {
             throw new RuntimeException(e);
         }
     }
 
+    public void forceKillJobWithRetry(String jobId) {
+        boolean done = false;
+
+        while (!done) {
+            try {
+                forceKillJob(jobId);
+                done = true;
+            } catch (RuntimeException e) {
+                if (!(e.getCause() instanceof PersistentException)) {
+                    done = true;
+                }
+            }
+        }
+    }
+
     //for migration only
     //TODO delete when migration finished
     public void resetJobOutput(String jobId, ExecutableState state, String 
output) {
@@ -475,6 +493,10 @@ public void resetJobOutput(String jobId, ExecutableState 
state, String output) {
     }
 
     public void addJobInfo(String id, Map<String, String> info) {
+        if (Thread.currentThread().isInterrupted()) {
+            throw new RuntimeException("Current thread is interrupted, 
aborting");
+        }
+
         if (info == null) {
             return;
         }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 5dd2c7c80d..3be8cc7613 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -115,6 +115,10 @@ public void run() {
             } catch (ExecuteException e) {
                 logger.error("ExecuteException job:" + executable.getId(), e);
             } catch (Exception e) {
+                if (AbstractExecutable.isMetaDataPersistException(e, 5)) {
+                    // Job fail due to PersistException
+                    
ExecutableManager.getInstance(jobEngineConfig.getConfig()).forceKillJobWithRetry(executable.getId());
+                }
                 logger.error("unknown error execute job:" + 
executable.getId(), e);
             } finally {
                 context.removeRunningJob(executable);
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java 
b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
new file mode 100644
index 0000000000..78b393c77f
--- /dev/null
+++ 
b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+public class PersistExceptionExecutable extends BaseTestExecutable {
+    public PersistExceptionExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
PersistentException {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+
+        throw new PersistentException("persistent exception");
+    }
+}
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 9d4c5753d8..544a5c4d52 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -150,7 +150,7 @@ public void testIllegalState() throws Exception {
         waitForJobFinish(job.getId(), 10000);
         Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
         Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.RUNNING, 
execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(task2.getId()).getState());
     }
 
     @SuppressWarnings("rawtypes")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to