Repository: kylin
Updated Branches:
  refs/heads/yaho-cube-planner 0b12df776 -> d216eba3e


APACHE-KYLIN-2735: Introduce an option to make job scheduler consider job 
priority


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d216eba3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d216eba3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d216eba3

Branch: refs/heads/yaho-cube-planner
Commit: d216eba3e10af99cb58b9ca45582d2bf5c8a0a37
Parents: 0b12df7
Author: Zhong <nju_y...@apache.org>
Authored: Thu Aug 31 14:56:19 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Thu Aug 31 14:56:19 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   8 ++
 .../kylin/job/engine/JobEngineConfig.java       |  14 +++
 .../kylin/job/execution/AbstractExecutable.java |   9 ++
 .../job/execution/CheckpointExecutable.java     |   7 ++
 .../job/execution/DefaultChainedExecutable.java |   7 ++
 .../job/impl/threadpool/DefaultScheduler.java   | 111 ++++++++++++++++++-
 .../org/apache/kylin/engine/mr/CubingJob.java   |  19 +++-
 7 files changed, 171 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d66d7ce..53bb2ad 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -503,6 +503,14 @@ abstract public class KylinConfigBase implements 
Serializable {
         return Integer.parseInt(getOptional("kylin.job.scheduler.default", 
"0"));
     }
 
+    public boolean getSchedulerPriorityConsidered() {
+        return 
Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", 
"false"));
+    }
+
+    public Integer getSchedulerPriorityBarFetchFromQueue() {
+        return 
Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue",
 "20"));
+    }
+
     public Integer getSchedulerPollIntervalSecond() {
         return 
Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "30"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java 
b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 6890557..9ba602f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -105,6 +105,20 @@ public class JobEngineConfig {
         return config;
     }
 
+    /**
+     * @return if consider job priority when scheduling jobs
+     * */
+    public boolean getJobPriorityConsidered() {
+        return config.getSchedulerPriorityConsidered();
+    }
+
+    /**
+     * @return the priority bar for fetching jobs from job priority queue
+     */
+    public int getFetchQueuePriorityBar() {
+        return config.getSchedulerPriorityBarFetchFromQueue();
+    }
+
     public String getHdfsWorkingDirectory() {
         return config.getHdfsWorkingDirectory();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 30b6421..a37cdc9 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -44,6 +44,8 @@ import com.google.common.collect.Maps;
  */
 public abstract class AbstractExecutable implements Executable, Idempotent {
 
+    public static final Integer DEFAULT_PRIORITY = 10;
+
     protected static final String SUBMITTER = "submitter";
     protected static final String NOTIFY_LIST = "notify_list";
     protected static final String START_TIME = "startTime";
@@ -389,6 +391,13 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
         return output.getState() == ExecutableState.READY;
     }
 
+    /**
+     * The larger the value, the higher priority
+     * */
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
+
     /*
     * discarded is triggered by JobService, the Scheduler is not awake of that
     *

http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
index 9864400..db477cb 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -29,6 +29,8 @@ public class CheckpointExecutable extends 
DefaultChainedExecutable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CheckpointExecutable.class);
 
+    public static final Integer DEFAULT_PRIORITY = 30;
+
     private static final String DEPLOY_ENV_NAME = "envName";
     private static final String PROJECT_INSTANCE_NAME = "projectName";
 
@@ -75,4 +77,9 @@ public class CheckpointExecutable extends 
DefaultChainedExecutable {
     public void setProjectName(String name) {
         setParam(PROJECT_INSTANCE_NAME, name);
     }
+
+    @Override
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 0ade541..8a816ba 100755
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Maps;
  */
 public class DefaultChainedExecutable extends AbstractExecutable implements 
ChainedExecutable {
 
+    public static final Integer DEFAULT_PRIORITY = 10;
+
     private final List<AbstractExecutable> subTasks = Lists.newArrayList();
 
     public DefaultChainedExecutable() {
@@ -168,4 +170,9 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
         executable.setId(getId() + "-" + String.format("%02d", 
subTasks.size()));
         this.subTasks.add(executable);
     }
+
+    @Override
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 315671c..684bd5b 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
+import java.util.Comparator;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -51,7 +54,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
 
     private JobLock jobLock;
     private ExecutableManager executableManager;
-    private FetcherRunner fetcher;
+    private Runnable fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService jobPool;
     private DefaultContext context;
@@ -69,6 +72,110 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
         }
     }
 
+    private class FetcherRunnerWithPriority implements Runnable {
+        volatile PriorityQueue<Pair<AbstractExecutable, Integer>> 
jobPriorityQueue = new PriorityQueue<>(1,
+                new Comparator<Pair<AbstractExecutable, Integer>>() {
+                    @Override
+                    public int compare(Pair<AbstractExecutable, Integer> o1, 
Pair<AbstractExecutable, Integer> o2) {
+                        return o1.getSecond() > o2.getSecond() ? -1 : 1;
+                    }
+                });
+
+        private void addToJobPool(AbstractExecutable executable, int priority) 
{
+            String jobDesc = executable.toString();
+            logger.info(jobDesc + " prepare to schedule and its priority is " 
+ priority);
+            try {
+                context.addRunningJob(executable);
+                jobPool.execute(new JobRunner(executable));
+                logger.info(jobDesc + " scheduled");
+            } catch (Exception ex) {
+                context.removeRunningJob(executable);
+                logger.warn(jobDesc + " fail to schedule", ex);
+            }
+        }
+
+        @Override
+        synchronized public void run() {
+            try {
+                // logger.debug("Job Fetcher is running...");
+                Map<String, Executable> runningJobs = context.getRunningJobs();
+
+                // fetch job from jobPriorityQueue first to reduce chance to 
scan job list
+                Map<String, Integer> leftJobPriorities = Maps.newHashMap();
+                Pair<AbstractExecutable, Integer> executableWithPriority;
+                while ((executableWithPriority = jobPriorityQueue.peek()) != 
null
+                        // the priority of jobs in pendingJobPriorities should 
be above a threshold
+                        && executableWithPriority.getSecond() >= 
jobEngineConfig.getFetchQueuePriorityBar()) {
+                    executableWithPriority = jobPriorityQueue.poll();
+                    AbstractExecutable executable = 
executableWithPriority.getFirst();
+                    int curPriority = executableWithPriority.getSecond();
+                    // the job should wait more than one time
+                    if (curPriority > executable.getDefaultPriority() + 1) {
+                        addToJobPool(executable, curPriority);
+                    } else {
+                        leftJobPriorities.put(executable.getId(), curPriority 
+ 1);
+                    }
+                }
+
+                if (runningJobs.size() >= 
jobEngineConfig.getMaxConcurrentJobLimit()) {
+                    logger.warn("There are too many jobs running, Job Fetch 
will wait until next schedule time");
+                    return;
+                }
+
+                while ((executableWithPriority = jobPriorityQueue.poll()) != 
null) {
+                    
leftJobPriorities.put(executableWithPriority.getFirst().getId(),
+                            executableWithPriority.getSecond() + 1);
+                }
+
+                int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, 
nError = 0, nDiscarded = 0, nSUCCEED = 0;
+                for (final String id : executableManager.getAllJobIds()) {
+                    if (runningJobs.containsKey(id)) {
+                        // logger.debug("Job id:" + id + " is already 
running");
+                        nRunning++;
+                        continue;
+                    }
+
+                    AbstractExecutable executable = 
executableManager.getJob(id);
+                    if (!executable.isReady()) {
+                        final Output output = executableManager.getOutput(id);
+                        // logger.debug("Job id:" + id + " not runnable");
+                        if (output.getState() == ExecutableState.DISCARDED) {
+                            nDiscarded++;
+                        } else if (output.getState() == ExecutableState.ERROR) 
{
+                            nError++;
+                        } else if (output.getState() == 
ExecutableState.SUCCEED) {
+                            nSUCCEED++;
+                        } else if (output.getState() == 
ExecutableState.STOPPED) {
+                            nStopped++;
+                        } else {
+                            nOthers++;
+                        }
+                        continue;
+                    }
+
+                    nReady++;
+                    Integer priority = leftJobPriorities.get(id);
+                    if (priority == null) {
+                        priority = executable.getDefaultPriority();
+                    }
+                    jobPriorityQueue.add(new Pair<>(executable, priority));
+                }
+
+                while (runningJobs.size() < 
jobEngineConfig.getMaxConcurrentJobLimit()
+                        && (executableWithPriority = jobPriorityQueue.poll()) 
!= null) {
+                    addToJobPool(executableWithPriority.getFirst(), 
executableWithPriority.getSecond());
+                }
+
+                logger.info("Job Fetcher: " + nRunning + " running, " + 
runningJobs.size() + " actual running, "
+                        + nStopped + " stopped, " + nReady + " ready, " + 
jobPriorityQueue.size() + " waiting, " //
+                        + nSUCCEED + " already succeed, " + nError + " error, 
" + nDiscarded + " discarded, " + nOthers
+                        + " others");
+            } catch (Exception e) {
+                logger.warn("Job Fetcher caught a exception " + e);
+            }
+        }
+    }
+
     private class FetcherRunner implements Runnable {
 
         @Override
@@ -218,7 +325,7 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
 
         int pollSecond = jobEngineConfig.getPollIntervalSecond();
         logger.info("Fetching jobs every {} seconds", pollSecond);
-        fetcher = new FetcherRunner();
+        fetcher = jobEngineConfig.getJobPriorityConsidered() ? new 
FetcherRunnerWithPriority() : new FetcherRunner();
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, 
TimeUnit.SECONDS);
         hasStarted = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d216eba3/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 1fa56c4..056b943 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -65,12 +65,18 @@ public class CubingJob extends DefaultChainedExecutable {
     }
 
     public enum CubingJobTypeEnum {
-        BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE");
+        BUILD("BUILD", 20), OPTIMIZE("OPTIMIZE", 5), MERGE("MERGE", 25);
 
         private final String name;
+        private final int defaultPriority;
 
-        CubingJobTypeEnum(String name) {
+        CubingJobTypeEnum(String name, int priority) {
             this.name = name;
+            this.defaultPriority = priority;
+        }
+
+        public int getDefaultPriority() {
+            return defaultPriority;
         }
 
         public String toString() {
@@ -151,6 +157,15 @@ public class CubingJob extends DefaultChainedExecutable {
         super();
     }
 
+    @Override
+    public int getDefaultPriority() {
+        CubingJobTypeEnum jobType = CubingJobTypeEnum.getByName(getJobType());
+        if (jobType == null) {
+            return super.getDefaultPriority();
+        }
+        return jobType.getDefaultPriority();
+    }
+
     protected void setDeployEnvName(String name) {
         setParam(DEPLOY_ENV_NAME, name);
     }

Reply via email to