Wayne1c closed pull request #406: jenkins failed test
URL: https://github.com/apache/kylin/pull/406
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 21cd8e92d0..877c0d01d0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -24,6 +24,7 @@
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -33,8 +34,9 @@
 
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultFetcherRunner.class);
 
-    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context, JobExecutor jobExecutor) {
-        super(jobEngineConfig, context, jobExecutor);
+    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context,
+            ExecutableManager executableManager, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, executableManager, jobExecutor);
     }
 
     @Override
@@ -48,7 +50,7 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : 
getExecutableManger().getAllJobIdsInCache()) {
+            for (final String id : executableManager.getAllJobIdsInCache()) {
                 if (isJobPoolFull()) {
                     return;
                 }
@@ -58,7 +60,7 @@ synchronized public void run() {
                     continue;
                 }
 
-                final Output outputDigest = 
getExecutableManger().getOutputDigest(id);
+                final Output outputDigest = 
executableManager.getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
                     if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -71,7 +73,7 @@ synchronized public void run() {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
-                            getExecutableManger().forceKillJob(id);
+                            executableManager.forceKillJob(id);
                             nError++;
                         } else {
                             nOthers++;
@@ -80,7 +82,7 @@ synchronized public void run() {
                     continue;
                 }
 
-                final AbstractExecutable executable = 
getExecutableManger().getJob(id);
+                final AbstractExecutable executable = 
executableManager.getJob(id);
                 if (!executable.isReady()) {
                     nOthers++;
                     continue;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 87670e67aa..8e828f1e55 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -188,8 +188,8 @@ public void execute(AbstractExecutable executable) {
             }
         };
         fetcher = jobEngineConfig.getJobPriorityConsidered()
-                ? new PriorityFetcherRunner(jobEngineConfig, context, 
jobExecutor)
-                : new DefaultFetcherRunner(jobEngineConfig, context, 
jobExecutor);
+                ? new PriorityFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor);
         logger.info("Creating fetcher pool instance:" + 
System.identityHashCode(fetcher));
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, 
TimeUnit.SECONDS);
         hasStarted = true;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 6f360d27d0..4479943bbe 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -237,8 +237,8 @@ public void execute(AbstractExecutable executable) {
             }
         };
         fetcher = jobEngineConfig.getJobPriorityConsidered()
-                ? new PriorityFetcherRunner(jobEngineConfig, context, 
jobExecutor)
-                : new DefaultFetcherRunner(jobEngineConfig, context, 
jobExecutor);
+                ? new PriorityFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, 
executableManager, jobExecutor);
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, 
TimeUnit.SECONDS);
         hasStarted = true;
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
index 9d8f20e4df..d98ca33cc0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -35,12 +35,15 @@
 
     protected JobEngineConfig jobEngineConfig;
     protected DefaultContext context;
+    protected ExecutableManager executableManager;
     protected JobExecutor jobExecutor;
     protected volatile boolean fetchFailed = false;
 
-    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext 
context, JobExecutor jobExecutor) {
+    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext 
context, ExecutableManager executableManager,
+            JobExecutor jobExecutor) {
         this.jobEngineConfig = jobEngineConfig;
         this.context = context;
+        this.executableManager = executableManager;
         this.jobExecutor = jobExecutor;
     }
 
@@ -71,8 +74,4 @@ protected void addToJobPool(AbstractExecutable executable, 
int priority) {
     void setFetchFailed(boolean fetchFailed) {
         this.fetchFailed = fetchFailed;
     }
-
-    ExecutableManager getExecutableManger() {
-        return ExecutableManager.getInstance(jobEngineConfig.getConfig());
-    }
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 0792ed0f5b..1d13afdcf8 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -27,6 +27,7 @@
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -46,8 +47,9 @@ public int compare(Pair<AbstractExecutable, Integer> o1, 
Pair<AbstractExecutable
                 }
             });
 
-    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context, JobExecutor jobExecutor) {
-        super(jobEngineConfig, context, jobExecutor);
+    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, 
DefaultContext context,
+            ExecutableManager executableManager, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, executableManager, jobExecutor);
     }
 
     @Override
@@ -84,14 +86,14 @@ synchronized public void run() {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 
0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : 
getExecutableManger().getAllJobIdsInCache()) {
+            for (final String id : executableManager.getAllJobIdsInCache()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
                     nRunning++;
                     continue;
                 }
 
-                final Output outputDigest = 
getExecutableManger().getOutputDigest(id);
+                final Output outputDigest = 
executableManager.getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
                     if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -104,7 +106,7 @@ synchronized public void run() {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
-                            getExecutableManger().forceKillJob(id);
+                            executableManager.forceKillJob(id);
                             nError++;
                         } else {
                             nOthers++;
@@ -113,7 +115,7 @@ synchronized public void run() {
                     continue;
                 }
 
-                AbstractExecutable executable = 
getExecutableManger().getJob(id);
+                AbstractExecutable executable = executableManager.getJob(id);
                 if (!executable.isReady()) {
                     nOthers++;
                     continue;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to