shaofengshi closed pull request #390: KYLIN-3406 Fix for two inconsistent 
ExecuteManger
URL: https://github.com/apache/kylin/pull/390
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 877c0d01d0..21cd8e92d0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -24,7 +24,6 @@
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -34,9 +33,8 @@
 
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultFetcherRunner.class);
 
-    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context,
-            ExecutableManager executableManager, JobExecutor jobExecutor) {
-        super(jobEngineConfig, context, executableManager, jobExecutor);
+    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, jobExecutor);
     }
 
     @Override
@@ -50,7 +48,7 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIdsInCache()) {
+            for (final String id : 
getExecutableManger().getAllJobIdsInCache()) {
                 if (isJobPoolFull()) {
                     return;
                 }
@@ -60,7 +58,7 @@ synchronized public void run() {
                     continue;
                 }
 
-                final Output outputDigest = 
executableManager.getOutputDigest(id);
+                final Output outputDigest = 
getExecutableManger().getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
                     if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -73,7 +71,7 @@ synchronized public void run() {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
-                            executableManager.forceKillJob(id);
+                            getExecutableManger().forceKillJob(id);
                             nError++;
                         } else {
                             nOthers++;
@@ -82,7 +80,7 @@ synchronized public void run() {
                     continue;
                 }
 
-                final AbstractExecutable executable = 
executableManager.getJob(id);
+                final AbstractExecutable executable = 
getExecutableManger().getJob(id);
                 if (!executable.isReady()) {
                     nOthers++;
                     continue;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 8e828f1e55..87670e67aa 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -188,8 +188,8 @@ public void execute(AbstractExecutable executable) {
             }
         };
         fetcher = jobEngineConfig.getJobPriorityConsidered()
-                ? new PriorityFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor)
-                : new DefaultFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor);
+                ? new PriorityFetcherRunner(jobEngineConfig, context, 
jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, 
jobExecutor);
         logger.info("Creating fetcher pool instance:" + 
System.identityHashCode(fetcher));
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, 
TimeUnit.SECONDS);
         hasStarted = true;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 4479943bbe..6f360d27d0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -237,8 +237,8 @@ public void execute(AbstractExecutable executable) {
             }
         };
         fetcher = jobEngineConfig.getJobPriorityConsidered()
-                ? new PriorityFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor)
-                : new DefaultFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor);
+                ? new PriorityFetcherRunner(jobEngineConfig, context, 
jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, 
jobExecutor);
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, 
TimeUnit.SECONDS);
         hasStarted = true;
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
index d98ca33cc0..9d8f20e4df 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -35,15 +35,12 @@
 
     protected JobEngineConfig jobEngineConfig;
     protected DefaultContext context;
-    protected ExecutableManager executableManager;
     protected JobExecutor jobExecutor;
     protected volatile boolean fetchFailed = false;
 
-    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext 
context, ExecutableManager executableManager,
-            JobExecutor jobExecutor) {
+    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext 
context, JobExecutor jobExecutor) {
         this.jobEngineConfig = jobEngineConfig;
         this.context = context;
-        this.executableManager = executableManager;
         this.jobExecutor = jobExecutor;
     }
 
@@ -74,4 +71,8 @@ protected void addToJobPool(AbstractExecutable executable, 
int priority) {
     void setFetchFailed(boolean fetchFailed) {
         this.fetchFailed = fetchFailed;
     }
+
+    ExecutableManager getExecutableManger() {
+        return ExecutableManager.getInstance(jobEngineConfig.getConfig());
+    }
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 1d13afdcf8..0792ed0f5b 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -27,7 +27,6 @@
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -47,9 +46,8 @@ public int compare(Pair<AbstractExecutable, Integer> o1, 
Pair<AbstractExecutable
                 }
             });
 
-    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context,
-            ExecutableManager executableManager, JobExecutor jobExecutor) {
-        super(jobEngineConfig, context, executableManager, jobExecutor);
+    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, jobExecutor);
     }
 
     @Override
@@ -86,14 +84,14 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIdsInCache()) {
+            for (final String id : 
getExecutableManger().getAllJobIdsInCache()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
                     nRunning++;
                     continue;
                 }
 
-                final Output outputDigest = 
executableManager.getOutputDigest(id);
+                final Output outputDigest = 
getExecutableManger().getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
                     if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -106,7 +104,7 @@ synchronized public void run() {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
-                            executableManager.forceKillJob(id);
+                            getExecutableManger().forceKillJob(id);
                             nError++;
                         } else {
                             nOthers++;
@@ -115,7 +113,7 @@ synchronized public void run() {
                     continue;
                 }
 
-                AbstractExecutable executable = executableManager.getJob(id);
+                AbstractExecutable executable = 
getExecutableManger().getJob(id);
                 if (!executable.isReady()) {
                     nOthers++;
                     continue;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to