This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 2ab720d Revert "KYLIN-3617 Use job's cache in job scheduler" 2ab720d is described below commit 2ab720d7f5cfae7d734d33d02a3b8969fc52486b Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Thu Oct 18 10:55:48 2018 +0800 Revert "KYLIN-3617 Use job's cache in job scheduler" This reverts commit 52307bab46db6240169492f240a8b69bde5110d8. --- .../org/apache/kylin/job/dao/ExecutableDao.java | 23 ---------------------- .../kylin/job/execution/ExecutableManager.java | 18 ----------------- .../job/impl/threadpool/DefaultFetcherRunner.java | 14 ++++++------- .../job/impl/threadpool/DefaultScheduler.java | 7 ++----- .../job/impl/threadpool/PriorityFetcherRunner.java | 14 ++++++------- 5 files changed, 16 insertions(+), 60 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 8352005..0cc6c8e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NavigableSet; -import java.util.Set; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -242,10 +241,6 @@ public class ExecutableDao { } } - public ExecutableOutputPO getJobOutputDigest(String uuid) { - return executableOutputDigestMap.get(uuid); - } - public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long timeEndExclusive) { List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList(); for (ExecutableOutputPO po : executableOutputDigestMap.values()) { @@ -273,10 +268,6 @@ public class ExecutableDao { } } - public ExecutablePO getJobDigest(String uuid) { - return executableDigestMap.get(uuid); - } - public List<ExecutablePO> getJobDigests(long timeStart, long timeEndExclusive) { List<ExecutablePO> jobDigests = Lists.newArrayList(); for (ExecutablePO po : executableDigestMap.values()) { @@ -286,11 +277,6 @@ public class ExecutableDao { return jobDigests; } - public List<String> getJobIdsInCache() { - Set<String> idSet = executableDigestMap.keySet(); - return Lists.newArrayList(idSet); - } - public List<String> getJobIds() throws PersistentException { try { NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); @@ -405,13 +391,4 @@ public class ExecutableDao { throw new PersistentException(e); } } - - public void reloadAll() throws IOException { - try (AutoReadWriteLock.AutoLock lock = executableDigestMapLock.lockForWrite()) { - executableDigestCrud.reloadAll(); - } - try (AutoReadWriteLock.AutoLock lock = executableOutputDigestMapLock.lockForWrite()) { - executableOutputDigestCrud.reloadAll(); - } - } } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index b866618..5cc8a0f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -155,10 +155,6 @@ public class ExecutableManager { } } - public AbstractExecutable getJobDigest(String uuid) { - return parseTo(executableDao.getJobDigest(uuid)); - } - public Output getOutput(String uuid) { try { final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid); @@ -170,12 +166,6 @@ public class ExecutableManager { } } - public Output getOutputDigest(String uuid) { - final ExecutableOutputPO jobOutput = executableDao.getJobOutputDigest(uuid); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid); - return parseOutput(jobOutput); - } - private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { final DefaultOutput result = new DefaultOutput(); result.setExtra(jobOutput.getInfo()); @@ -296,10 +286,6 @@ public class ExecutableManager { } } - public List<String> getAllJobIdsInCache() { - return executableDao.getJobIdsInCache(); - } - public void resumeAllRunningJobs() { try { final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); @@ -453,10 +439,6 @@ public class ExecutableManager { } } - public void reloadAll() throws IOException { - executableDao.reloadAll(); - } - public void forceKillJob(String jobId) { try { final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java index 877c0d0..e5f15fe 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java @@ -50,7 +50,7 @@ public class DefaultFetcherRunner extends FetcherRunner { } int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; - for (final String id : executableManager.getAllJobIdsInCache()) { + for (final String id : executableManager.getAllJobIds()) { if (isJobPoolFull()) { return; } @@ -60,16 +60,16 @@ public class DefaultFetcherRunner extends FetcherRunner { continue; } - final Output outputDigest = executableManager.getOutputDigest(id); - if ((outputDigest.getState() != ExecutableState.READY)) { + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { // logger.debug("Job id:" + id + " not runnable"); - if (outputDigest.getState() == ExecutableState.SUCCEED) { + if (output.getState() == ExecutableState.SUCCEED) { nSUCCEED++; - } else if (outputDigest.getState() == ExecutableState.ERROR) { + } else if (output.getState() == ExecutableState.ERROR) { nError++; - } else if (outputDigest.getState() == ExecutableState.DISCARDED) { + } else if (output.getState() == ExecutableState.DISCARDED) { nDiscarded++; - } else if (outputDigest.getState() == ExecutableState.STOPPED) { + } else if (output.getState() == ExecutableState.STOPPED) { nStopped++; } else { if (fetchFailed) { diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index bcd6c81..5dd2c7c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -78,6 +78,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti // ============================================================================ private JobLock jobLock; + private ExecutableManager executableManager; private FetcherRunner fetcher; private ScheduledExecutorService fetcherPool; private ExecutorService jobPool; @@ -94,10 +95,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } - public ExecutableManager getExecutableManager() { - return ExecutableManager.getInstance(jobEngineConfig.getConfig()); - } - public FetcherRunner getFetcherRunner() { return fetcher; } @@ -162,6 +159,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti throw new IllegalStateException("Cannot start job scheduler due to lack of job lock"); } + executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig()); //load all executable, set them to a consistent status fetcherPool = Executors.newScheduledThreadPool(1); int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); @@ -170,7 +168,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); logger.info("Staring resume all running jobs."); - ExecutableManager executableManager = getExecutableManager(); executableManager.resumeAllRunningJobs(); logger.info("Finishing resume all running jobs."); diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java index 1d13afd..b562fac 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java @@ -86,23 +86,23 @@ public class PriorityFetcherRunner extends FetcherRunner { } int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; - for (final String id : executableManager.getAllJobIdsInCache()) { + for (final String id : executableManager.getAllJobIds()) { if (runningJobs.containsKey(id)) { // logger.debug("Job id:" + id + " is already running"); nRunning++; continue; } - final Output outputDigest = executableManager.getOutputDigest(id); - if ((outputDigest.getState() != ExecutableState.READY)) { + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { // logger.debug("Job id:" + id + " not runnable"); - if (outputDigest.getState() == ExecutableState.SUCCEED) { + if (output.getState() == ExecutableState.SUCCEED) { nSUCCEED++; - } else if (outputDigest.getState() == ExecutableState.ERROR) { + } else if (output.getState() == ExecutableState.ERROR) { nError++; - } else if (outputDigest.getState() == ExecutableState.DISCARDED) { + } else if (output.getState() == ExecutableState.DISCARDED) { nDiscarded++; - } else if (outputDigest.getState() == ExecutableState.STOPPED) { + } else if (output.getState() == ExecutableState.STOPPED) { nStopped++; } else { if (fetchFailed) {