movig to cachedTP
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ae15740d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ae15740d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ae15740d Branch: refs/heads/master Commit: ae15740d87dd8d128eb2f54fdcf1f9a73e2e3037 Parents: 98f2186 Author: Lahiru Gunathilake <[email protected]> Authored: Wed Apr 15 10:37:24 2015 -0400 Committer: Lahiru Gunathilake <[email protected]> Committed: Wed Apr 15 10:37:24 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/airavata/gfac/server/GfacServer.java | 2 +- .../apache/airavata/gfac/server/GfacServerHandler.java | 10 +++------- .../airavata/gfac/monitor/email/EmailBasedMonitor.java | 8 ++++++-- .../gfac/monitor/impl/pull/qstat/HPCPullMonitor.java | 8 ++++---- 4 files changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ae15740d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java index 01115b6..01c12ad 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java @@ -110,7 +110,7 @@ public class GfacServer implements IServer{ setStatus(IServer.ServerStatus.STOPING); server.stop(); } - GFacThreadPoolExecutor.getFixedThreadPool().shutdownNow(); + GFacThreadPoolExecutor.getThreadPool().shutdownNow(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/ae15740d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 524a8a9..462fbd8 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -246,7 +246,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { */ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException { requestCount++; - logger.info("-----------------------------------------------------" + requestCount+"-----------------------------------------------------"); + logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------"); logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId); GFac gfac = getGfac(); InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); @@ -255,12 +255,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " + "{}", experimentId, taskId, gatewayId); - try { - GFacThreadPoolExecutor.getFixedThreadPool().execute(inputHandlerWorker); - } catch (ApplicationSettingsException e) { - logger.error(e.getMessage(), e); - throw new TException(e); - } + GFacThreadPoolExecutor.getThreadPool().execute(inputHandlerWorker); // we immediately return when we have a threadpool return true; @@ -405,6 +400,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { + "' and with message type '" + message.getType()); } catch (TException e) { logger.error(e.getMessage(), e); //nobody is listening so nothing to throw + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/ae15740d/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index 631e1a4..95c98b8 100644 --- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -21,6 +21,7 @@ package org.apache.airavata.gfac.monitor.email; import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; import org.apache.airavata.common.utils.ServerSettings; @@ -54,7 +55,6 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; public class EmailBasedMonitor implements Runnable{ - private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class); private static final String PBS_CONSULT_SDSC_EDU = "[email protected]"; @@ -207,7 +207,11 @@ public class EmailBasedMonitor implements Runnable{ JobState resultState = jobStatusResult.getState(); jEC.getJobDetails().setJobStatus(new JobStatus(resultState)); if (resultState == JobState.COMPLETE) { - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher())); + try { + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher())); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage(), e); + } }else if (resultState == JobState.QUEUED) { // TODO - publish queued rabbitmq message }else if (resultState == JobState.FAILED) { http://git-wip-us.apache.org/repos/asf/airavata/blob/ae15740d/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 26d3385..04f9e7d 100644 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -197,7 +197,7 @@ public class HPCPullMonitor extends PullMonitor { sendNotification(iMonitorID); logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); Thread.sleep(10000); - GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); break; } } @@ -225,7 +225,7 @@ public class HPCPullMonitor extends PullMonitor { sendNotification(iMonitorID); logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); Thread.sleep(10000); - GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); break; } } @@ -250,7 +250,7 @@ public class HPCPullMonitor extends PullMonitor { removeList.add(iMonitorID); logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); - GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); } iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); @@ -288,7 +288,7 @@ public class HPCPullMonitor extends PullMonitor { sendNotification(iMonitorID); // CommonUtils.removeMonitorFromQueue(take, iMonitorID); removeList.add(iMonitorID); - GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else { iMonitorID.setFailedCount(0); }
