This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ab720d  Revert "KYLIN-3617 Use job's cache in job scheduler"
2ab720d is described below

commit 2ab720d7f5cfae7d734d33d02a3b8969fc52486b
Author: shaofengshi <shaofeng...@apache.org>
AuthorDate: Thu Oct 18 10:55:48 2018 +0800

    Revert "KYLIN-3617 Use job's cache in job scheduler"
    
    This reverts commit 52307bab46db6240169492f240a8b69bde5110d8.
---
 .../org/apache/kylin/job/dao/ExecutableDao.java    | 23 ----------------------
 .../kylin/job/execution/ExecutableManager.java     | 18 -----------------
 .../job/impl/threadpool/DefaultFetcherRunner.java  | 14 ++++++-------
 .../job/impl/threadpool/DefaultScheduler.java      |  7 ++-----
 .../job/impl/threadpool/PriorityFetcherRunner.java | 14 ++++++-------
 5 files changed, 16 insertions(+), 60 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java 
b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 8352005..0cc6c8e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -242,10 +241,6 @@ public class ExecutableDao {
         }
     }
 
-    public ExecutableOutputPO getJobOutputDigest(String uuid) {
-        return executableOutputDigestMap.get(uuid);
-    }
-
     public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long 
timeEndExclusive) {
         List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList();
         for (ExecutableOutputPO po : executableOutputDigestMap.values()) {
@@ -273,10 +268,6 @@ public class ExecutableDao {
         }
     }
 
-    public ExecutablePO getJobDigest(String uuid) {
-        return executableDigestMap.get(uuid);
-    }
-
     public List<ExecutablePO> getJobDigests(long timeStart, long 
timeEndExclusive) {
         List<ExecutablePO> jobDigests = Lists.newArrayList();
         for (ExecutablePO po : executableDigestMap.values()) {
@@ -286,11 +277,6 @@ public class ExecutableDao {
         return jobDigests;
     }
 
-    public List<String> getJobIdsInCache() {
-        Set<String> idSet = executableDigestMap.keySet();
-        return Lists.newArrayList(idSet);
-    }
-
     public List<String> getJobIds() throws PersistentException {
         try {
             NavigableSet<String> resources = 
store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
@@ -405,13 +391,4 @@ public class ExecutableDao {
             throw new PersistentException(e);
         }
     }
-
-    public void reloadAll() throws IOException {
-        try (AutoReadWriteLock.AutoLock lock = 
executableDigestMapLock.lockForWrite()) {
-            executableDigestCrud.reloadAll();
-        }
-        try (AutoReadWriteLock.AutoLock lock = 
executableOutputDigestMapLock.lockForWrite()) {
-            executableOutputDigestCrud.reloadAll();
-        }
-    }
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index b866618..5cc8a0f 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -155,10 +155,6 @@ public class ExecutableManager {
         }
     }
 
-    public AbstractExecutable getJobDigest(String uuid) {
-        return parseTo(executableDao.getJobDigest(uuid));
-    }
-
     public Output getOutput(String uuid) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(uuid);
@@ -170,12 +166,6 @@ public class ExecutableManager {
         }
     }
 
-    public Output getOutputDigest(String uuid) {
-        final ExecutableOutputPO jobOutput = 
executableDao.getJobOutputDigest(uuid);
-        Preconditions.checkArgument(jobOutput != null, "there is no related 
output for job id:" + uuid);
-        return parseOutput(jobOutput);
-    }
-
     private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
         final DefaultOutput result = new DefaultOutput();
         result.setExtra(jobOutput.getInfo());
@@ -296,10 +286,6 @@ public class ExecutableManager {
         }
     }
 
-    public List<String> getAllJobIdsInCache() {
-        return executableDao.getJobIdsInCache();
-    }
-
     public void resumeAllRunningJobs() {
         try {
             final List<ExecutableOutputPO> jobOutputs = 
executableDao.getJobOutputs();
@@ -453,10 +439,6 @@ public class ExecutableManager {
         }
     }
 
-    public void reloadAll() throws IOException {
-        executableDao.reloadAll();
-    }
-
     public void forceKillJob(String jobId) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 877c0d0..e5f15fe 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -50,7 +50,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIdsInCache()) {
+            for (final String id : executableManager.getAllJobIds()) {
                 if (isJobPoolFull()) {
                     return;
                 }
@@ -60,16 +60,16 @@ public class DefaultFetcherRunner extends FetcherRunner {
                     continue;
                 }
 
-                final Output outputDigest = 
executableManager.getOutputDigest(id);
-                if ((outputDigest.getState() != ExecutableState.READY)) {
+                final Output output = executableManager.getOutput(id);
+                if ((output.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
+                    if (output.getState() == ExecutableState.SUCCEED) {
                         nSUCCEED++;
-                    } else if (outputDigest.getState() == 
ExecutableState.ERROR) {
+                    } else if (output.getState() == ExecutableState.ERROR) {
                         nError++;
-                    } else if (outputDigest.getState() == 
ExecutableState.DISCARDED) {
+                    } else if (output.getState() == ExecutableState.DISCARDED) 
{
                         nDiscarded++;
-                    } else if (outputDigest.getState() == 
ExecutableState.STOPPED) {
+                    } else if (output.getState() == ExecutableState.STOPPED) {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index bcd6c81..5dd2c7c 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -78,6 +78,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
     // 
============================================================================
 
     private JobLock jobLock;
+    private ExecutableManager executableManager;
     private FetcherRunner fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService jobPool;
@@ -94,10 +95,6 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         }
     }
 
-    public ExecutableManager getExecutableManager() {
-        return ExecutableManager.getInstance(jobEngineConfig.getConfig());
-    }
-
     public FetcherRunner getFetcherRunner() {
         return fetcher;
     }
@@ -162,6 +159,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
             throw new IllegalStateException("Cannot start job scheduler due to 
lack of job lock");
         }
 
+        executableManager = 
ExecutableManager.getInstance(jobEngineConfig.getConfig());
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1);
         int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
@@ -170,7 +168,6 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         context = new DefaultContext(Maps.<String, Executable> 
newConcurrentMap(), jobEngineConfig.getConfig());
 
         logger.info("Staring resume all running jobs.");
-        ExecutableManager executableManager = getExecutableManager();
         executableManager.resumeAllRunningJobs();
         logger.info("Finishing resume all running jobs.");
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 1d13afd..b562fac 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -86,23 +86,23 @@ public class PriorityFetcherRunner extends FetcherRunner {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIdsInCache()) {
+            for (final String id : executableManager.getAllJobIds()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
                     nRunning++;
                     continue;
                 }
 
-                final Output outputDigest = 
executableManager.getOutputDigest(id);
-                if ((outputDigest.getState() != ExecutableState.READY)) {
+                final Output output = executableManager.getOutput(id);
+                if ((output.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
+                    if (output.getState() == ExecutableState.SUCCEED) {
                         nSUCCEED++;
-                    } else if (outputDigest.getState() == 
ExecutableState.ERROR) {
+                    } else if (output.getState() == ExecutableState.ERROR) {
                         nError++;
-                    } else if (outputDigest.getState() == 
ExecutableState.DISCARDED) {
+                    } else if (output.getState() == ExecutableState.DISCARDED) 
{
                         nDiscarded++;
-                    } else if (outputDigest.getState() == 
ExecutableState.STOPPED) {
+                    } else if (output.getState() == ExecutableState.STOPPED) {
                         nStopped++;
                     } else {
                         if (fetchFailed) {

Reply via email to