shaofengshi closed pull request #311: KYLIN-3647 Fix inconsistent states of job and its sub-task URL: https://github.com/apache/kylin/pull/311
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index ad22abc322..b8d3144acc 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -99,7 +99,7 @@ private void onExecuteFinishedWithRetry(ExecuteResult result, ExecutableContext onExecuteFinished(result, executableContext); } catch (Exception e) { logger.error(nRetry + "th retries for onExecuteFinished fails due to {}", e); - if (isMetaDataPersistException(e)) { + if (isMetaDataPersistException(e, 5)) { exception = e; try { Thread.sleep(1000L * (long) Math.pow(4, nRetry)); @@ -211,14 +211,21 @@ protected void handleMetadataPersistException(ExecutableContext context, Throwab new MailService(context.getConfig()).sendMail(users, title, content); } - private boolean isMetaDataPersistException(Exception e) { + protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException; + + @Override + public void cleanup() throws ExecuteException { + + } + + public static boolean isMetaDataPersistException(Exception e, final int maxDepth) { if (e instanceof PersistentException) { return true; } Throwable t = e.getCause(); int depth = 0; - while (t != null && depth < 5) { + while (t != null && depth < maxDepth) { depth++; if (t instanceof PersistentException) { return true; @@ -228,13 +235,6 @@ private boolean isMetaDataPersistException(Exception e) { return false; } - protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException; - - @Override - public void cleanup() throws ExecuteException { - - } - @Override public boolean isRunnable() { return this.getStatus() == ExecutableState.READY; diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index ec660fd28b..b912ecc190 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -112,9 +112,9 @@ protected void onExecuteFinished(ExecuteResult result, ExecutableContext executa "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n" + "it might cause endless state, will retry to fetch subtask's state.", task.getId(), task.getName()); - boolean retryRet = retryFetchTaskStatus(task); - if (false == retryRet) - hasError = true; + getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null, + "killed due to inconsistent state"); + hasError = true; } final ExecutableState status = task.getStatus(); @@ -175,34 +175,6 @@ public void addTask(AbstractExecutable executable) { this.subTasks.add(executable); } - private boolean retryFetchTaskStatus(Executable task) { - boolean hasRunning = false; - int retry = 1; - while (retry <= 10) { - ExecutableState retryState = task.getStatus(); - if (retryState == ExecutableState.RUNNING) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - logger.error("Failed to Sleep: ", e); - } - hasRunning = true; - logger.error("With {} times retry, it's state is still RUNNING", retry); - } else { - logger.info("With {} times retry, status is changed to: {}", retry, retryState); - hasRunning = false; - break; - } - retry++; - } - if (hasRunning) { - logger.error("Parent task: {} is finished, but it's subtask: {}'s state is still RUNNING \n" - + ", mark parent task failed.", getName(), task.getName()); - return false; - } - return true; - } - @Override public int getDefaultPriority() { return DEFAULT_PRIORITY; diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 5cc8a0f7d7..09b7b8e4bf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -442,7 +442,6 @@ public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, public void forceKillJob(String jobId) { try { final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - jobOutput.setStatus(ExecutableState.ERROR.toString()); List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks(); for (ExecutablePO task : tasks) { @@ -453,12 +452,31 @@ public void forceKillJob(String jobId) { } break; } - executableDao.updateJobOutput(jobOutput); + + if (!jobOutput.getStatus().equals(ExecutableState.ERROR.toString())) { + jobOutput.setStatus(ExecutableState.ERROR.toString()); + executableDao.updateJobOutput(jobOutput); + } } catch (PersistentException e) { throw new RuntimeException(e); } } + public void forceKillJobWithRetry(String jobId) { + boolean done = false; + + while (!done) { + try { + forceKillJob(jobId); + done = true; + } catch (RuntimeException e) { + if (!(e.getCause() instanceof PersistentException)) { + done = true; + } + } + } + } + //for migration only //TODO delete when migration finished public void resetJobOutput(String jobId, ExecutableState state, String output) { @@ -475,6 +493,10 @@ public void resetJobOutput(String jobId, ExecutableState state, String output) { } public void addJobInfo(String id, Map<String, String> info) { + if (Thread.currentThread().isInterrupted()) { + throw new RuntimeException("Current thread is interrupted, aborting"); + } + if (info == null) { return; } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 5dd2c7c80d..3be8cc7613 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -115,6 +115,10 @@ public void run() { } catch (ExecuteException e) { logger.error("ExecuteException job:" + executable.getId(), e); } catch (Exception e) { + if (AbstractExecutable.isMetaDataPersistException(e, 5)) { + // Job fail due to PersistException + ExecutableManager.getInstance(jobEngineConfig.getConfig()).forceKillJobWithRetry(executable.getId()); + } logger.error("unknown error execute job:" + executable.getId(), e); } finally { context.removeRunningJob(executable); diff --git a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java new file mode 100644 index 0000000000..78b393c77f --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +public class PersistExceptionExecutable extends BaseTestExecutable { + public PersistExceptionExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws PersistentException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + throw new PersistentException("persistent exception"); + } +} diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index 9d4c5753d8..544a5c4d52 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -150,7 +150,7 @@ public void testIllegalState() throws Exception { waitForJobFinish(job.getId(), 10000); Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.RUNNING, execMgr.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState()); } @SuppressWarnings("rawtypes") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services