[ 
https://issues.apache.org/jira/browse/KYLIN-3617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713962#comment-16713962
 ] 

ASF GitHub Bot commented on KYLIN-3617:
---------------------------------------

shaofengshi closed pull request #304: KYLIN-3617 Use job's cache in job 
scheduler
URL: https://github.com/apache/kylin/pull/304
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java 
b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 0cc6c8e5fb..4e7188a71c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -23,6 +23,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -241,6 +242,10 @@ private long writeJobOutputResource(String path, 
ExecutableOutputPO output) thro
         }
     }
 
+    public ExecutableOutputPO getJobOutputDigest(String uuid) {
+        return executableOutputDigestMap.get(uuid);
+    }
+
     public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long 
timeEndExclusive) {
         List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList();
         for (ExecutableOutputPO po : executableOutputDigestMap.values()) {
@@ -268,6 +273,10 @@ private long writeJobOutputResource(String path, 
ExecutableOutputPO output) thro
         }
     }
 
+    public ExecutablePO getJobDigest(String uuid) {
+        return executableDigestMap.get(uuid);
+    }
+
     public List<ExecutablePO> getJobDigests(long timeStart, long 
timeEndExclusive) {
         List<ExecutablePO> jobDigests = Lists.newArrayList();
         for (ExecutablePO po : executableDigestMap.values()) {
@@ -277,6 +286,11 @@ private long writeJobOutputResource(String path, 
ExecutableOutputPO output) thro
         return jobDigests;
     }
 
+    public List<String> getJobIdsInCache() {
+        Set<String> idSet = executableDigestMap.keySet();
+        return Lists.newArrayList(idSet);
+    }
+
     public List<String> getJobIds() throws PersistentException {
         try {
             NavigableSet<String> resources = 
store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
@@ -391,4 +405,33 @@ public void deleteJobOutput(String uuid) throws 
PersistentException {
             throw new PersistentException(e);
         }
     }
+
+    public void reloadAll() throws IOException {
+        try (AutoReadWriteLock.AutoLock lock = 
executableDigestMapLock.lockForWrite()) {
+            executableDigestCrud.reloadAll();
+        }
+        try (AutoReadWriteLock.AutoLock lock = 
executableOutputDigestMapLock.lockForWrite()) {
+            executableOutputDigestCrud.reloadAll();
+        }
+    }
+
+    public void syncDigestsOfJob(String uuid) throws PersistentException {
+        ExecutablePO job = getJob(uuid);
+        ExecutablePO jobDigest = getJobDigest(uuid);
+
+        if (job == null && jobDigest != null) {
+            executableDigestMap.remove(uuid);
+        } else if (job != null && jobDigest == null) {
+            executableDigestMap.put(uuid, job);
+        }
+
+        ExecutableOutputPO jobOutput = getJobOutput(uuid);
+        ExecutableOutputPO jobOutputDigest = getJobOutputDigest(uuid);
+
+        if (jobOutput == null && jobOutputDigest != null) {
+            executableOutputDigestMap.remove(uuid);
+        } else if (jobOutput != null && jobOutputDigest == null) {
+            executableOutputDigestMap.put(uuid, jobOutput);
+        }
+    }
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 5cc8a0f7d7..2510cf9e11 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -155,6 +155,14 @@ public AbstractExecutable getJob(String uuid) {
         }
     }
 
+    public AbstractExecutable getJobDigest(String uuid) {
+        return parseTo(executableDao.getJobDigest(uuid));
+    }
+
+    public void syncDigestsOfJob(String uuid) throws PersistentException {
+        executableDao.syncDigestsOfJob(uuid);
+    }
+
     public Output getOutput(String uuid) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(uuid);
@@ -166,6 +174,12 @@ public Output getOutput(String uuid) {
         }
     }
 
+    public Output getOutputDigest(String uuid) {
+        final ExecutableOutputPO jobOutput = 
executableDao.getJobOutputDigest(uuid);
+        Preconditions.checkArgument(jobOutput != null, "there is no related 
output for job id:" + uuid);
+        return parseOutput(jobOutput);
+    }
+
     private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
         final DefaultOutput result = new DefaultOutput();
         result.setExtra(jobOutput.getInfo());
@@ -286,6 +300,10 @@ public void updateAllRunningJobsToError() {
         }
     }
 
+    public List<String> getAllJobIdsInCache() {
+        return executableDao.getJobIdsInCache();
+    }
+
     public void resumeAllRunningJobs() {
         try {
             final List<ExecutableOutputPO> jobOutputs = 
executableDao.getJobOutputs();
@@ -439,6 +457,10 @@ public void updateJobOutput(String jobId, ExecutableState 
newStatus, Map<String,
         }
     }
 
+    public void reloadAll() throws IOException {
+        executableDao.reloadAll();
+    }
+
     public void forceKillJob(String jobId) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index e5f15fe49d..877c0d01d0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -50,7 +50,7 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIds()) {
+            for (final String id : executableManager.getAllJobIdsInCache()) {
                 if (isJobPoolFull()) {
                     return;
                 }
@@ -60,16 +60,16 @@ synchronized public void run() {
                     continue;
                 }
 
-                final Output output = executableManager.getOutput(id);
-                if ((output.getState() != ExecutableState.READY)) {
+                final Output outputDigest = 
executableManager.getOutputDigest(id);
+                if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (output.getState() == ExecutableState.SUCCEED) {
+                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
                         nSUCCEED++;
-                    } else if (output.getState() == ExecutableState.ERROR) {
+                    } else if (outputDigest.getState() == 
ExecutableState.ERROR) {
                         nError++;
-                    } else if (output.getState() == ExecutableState.DISCARDED) 
{
+                    } else if (outputDigest.getState() == 
ExecutableState.DISCARDED) {
                         nDiscarded++;
-                    } else if (output.getState() == ExecutableState.STOPPED) {
+                    } else if (outputDigest.getState() == 
ExecutableState.STOPPED) {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 5dd2c7c80d..bcd6c81844 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -78,7 +78,6 @@ public static synchronized void destroyInstance() {
     // 
============================================================================
 
     private JobLock jobLock;
-    private ExecutableManager executableManager;
     private FetcherRunner fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService jobPool;
@@ -95,6 +94,10 @@ public DefaultScheduler() {
         }
     }
 
+    public ExecutableManager getExecutableManager() {
+        return ExecutableManager.getInstance(jobEngineConfig.getConfig());
+    }
+
     public FetcherRunner getFetcherRunner() {
         return fetcher;
     }
@@ -159,7 +162,6 @@ public synchronized void init(JobEngineConfig 
jobEngineConfig, JobLock lock) thr
             throw new IllegalStateException("Cannot start job scheduler due to 
lack of job lock");
         }
 
-        executableManager = 
ExecutableManager.getInstance(jobEngineConfig.getConfig());
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1);
         int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
@@ -168,6 +170,7 @@ public synchronized void init(JobEngineConfig 
jobEngineConfig, JobLock lock) thr
         context = new DefaultContext(Maps.<String, Executable> 
newConcurrentMap(), jobEngineConfig.getConfig());
 
         logger.info("Staring resume all running jobs.");
+        ExecutableManager executableManager = getExecutableManager();
         executableManager.resumeAllRunningJobs();
         logger.info("Finishing resume all running jobs.");
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index d6f9fe2c08..96ca9ee7bb 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -39,6 +39,7 @@
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -153,6 +154,13 @@ public void onUnlock(String path, String nodeData) {
             String[] paths = path.split("/");
             String jobId = paths[paths.length - 1];
 
+            // Sync execute cache in case broadcast not available
+            try {
+                executableManager.syncDigestsOfJob(jobId);
+            } catch (PersistentException e) {
+                logger.error("Failed to sync cache of job: " + jobId + ", at 
server: " + serverName);
+            }
+
             final Output output = executableManager.getOutput(jobId);
             if (output.getState() == ExecutableState.RUNNING) {
                 AbstractExecutable executable = 
executableManager.getJob(jobId);
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index b562fac8c1..1d13afdcf8 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -86,23 +86,23 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIds()) {
+            for (final String id : executableManager.getAllJobIdsInCache()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
                     nRunning++;
                     continue;
                 }
 
-                final Output output = executableManager.getOutput(id);
-                if ((output.getState() != ExecutableState.READY)) {
+                final Output outputDigest = 
executableManager.getOutputDigest(id);
+                if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
-                    if (output.getState() == ExecutableState.SUCCEED) {
+                    if (outputDigest.getState() == ExecutableState.SUCCEED) {
                         nSUCCEED++;
-                    } else if (output.getState() == ExecutableState.ERROR) {
+                    } else if (outputDigest.getState() == 
ExecutableState.ERROR) {
                         nError++;
-                    } else if (output.getState() == ExecutableState.DISCARDED) 
{
+                    } else if (outputDigest.getState() == 
ExecutableState.DISCARDED) {
                         nDiscarded++;
-                    } else if (output.getState() == ExecutableState.STOPPED) {
+                    } else if (outputDigest.getState() == 
ExecutableState.STOPPED) {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java 
b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index be54dce735..c77592c25c 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -57,8 +57,8 @@
     static File localMetaDir;
     static String backup;
 
-    static final String jobId1 = "job1" + RandomUtil.randomUUID();
-    static final String jobId2 = "job2" + RandomUtil.randomUUID();
+    static final String jobId1 = RandomUtil.randomUUID().toString();
+    static final String jobId2 = RandomUtil.randomUUID().toString();
     static final String serverName1 = "serverName1";
     static final String serverName2 = "serverName2";
     static final String confDstPath1 = 
"target/kylin_metadata_dist_lock_test1/kylin.properties";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Reduce number of visiting metastore for job scheduler
> -----------------------------------------------------
>
>                 Key: KYLIN-3617
>                 URL: https://issues.apache.org/jira/browse/KYLIN-3617
>             Project: Kylin
>          Issue Type: Improvement
>          Components: Job Engine
>    Affects Versions: v2.4.1
>            Reporter: nichunen
>            Assignee: nichunen
>            Priority: Major
>             Fix For: v2.6.0
>
>         Attachments: image-2018-12-09-20-22-51-336.png, 
> image-2018-12-09-20-24-30-590.png, image-2018-12-09-20-25-40-265.png, 
> image-2018-12-09-20-26-30-105.png, image-2018-12-09-20-26-40-478.png
>
>
> For KYLIN-3470 introduced cache for jobs' metadata, it's also can be used in 
> job scheduler to reduce the pressure on metastore



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to