Fixed AIRAVATA-1611
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8b2a6b06 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8b2a6b06 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8b2a6b06 Branch: refs/heads/master Commit: 8b2a6b0625cef632cb75a7b8e04fb27582652112 Parents: d25441a Author: shamrath <[email protected]> Authored: Fri Feb 27 14:46:13 2015 -0500 Committer: shamrath <[email protected]> Committed: Fri Feb 27 14:46:13 2015 -0500 ---------------------------------------------------------------------- .../airavata/common/utils/ServerSettings.java | 15 +++++ .../main/resources/airavata-server.properties | 1 + .../main/resources/airavata-server.properties | 1 + .../server/OrchestratorServerHandler.java | 15 +---- .../engine/WorkflowEnactmentService.java | 64 ++++++++++++++++++++ .../workflow/engine/WorkflowFactoryImpl.java | 4 +- 6 files changed, 85 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/8b2a6b06/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index b076e6a..5073949 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -61,6 +61,11 @@ public class ServerSettings extends ApplicationSettings { public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled +// Workflow Enactment Service component configuration. + private static final String ENACTMENT_THREAD_POOL_SIZE = "enactment.thread.pool.size"; + private static final int DEFAULT_ENACTMENT_THREAD_POOL_SIZE = 10; + + private static boolean stopAllThreads = false; public static String getDefaultUser() throws ApplicationSettingsException { @@ -187,4 +192,14 @@ public class ServerSettings extends ApplicationSettings { } return null; } + + public static int getEnactmentThreadPoolSize() { + String threadPoolSize = null; + try { + threadPoolSize = getSetting(ENACTMENT_THREAD_POOL_SIZE); + } catch (ApplicationSettingsException e) { + return DEFAULT_ENACTMENT_THREAD_POOL_SIZE; + } + return Integer.valueOf(threadPoolSize); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/8b2a6b06/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index e309901..12566ec 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -160,6 +160,7 @@ gfac.passive=false #provenanceWriterThreadPoolSize=20 #gfac.embedded=true #workflowserver=org.apache.airavata.api.server.WorkflowServer +enactment.thread.pool.size=10 ########################################################################### http://git-wip-us.apache.org/repos/asf/airavata/blob/8b2a6b06/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties index 2ecdeb6..3cd3cdb 100644 --- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties +++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties @@ -144,6 +144,7 @@ trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates #provenanceWriterThreadPoolSize=20 #gfac.embedded=true #workflowserver=org.apache.airavata.api.server.WorkflowServer +enactment.thread.pool.size=10 ########################################################################### http://git-wip-us.apache.org/repos/asf/airavata/blob/8b2a6b06/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index a4e105e..d168c26 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -66,6 +66,7 @@ import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetai import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants; import org.apache.airavata.orchestrator.util.DataModelUtils; import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter; +import org.apache.airavata.simple.workflow.engine.WorkflowEnactmentService; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.*; @@ -652,19 +653,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, } private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { -// try { -// WorkflowEngine workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); -// workflowEngine.launchExperiment(experimentId, airavataCredStoreToken); -// } catch (WorkflowEngineException e) { -// log.errorId(experimentId, "Error while launching experiment.", e); -// } try { - SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( - experimentId, airavataCredStoreToken,getGatewayName(), getRabbitMQProcessPublisher()); - - Thread thread = new Thread(simpleWorkflowInterpreter); - thread.start(); -// simpleWorkflowInterpreter.run(); + WorkflowEnactmentService.getInstance(). + submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), getRabbitMQProcessPublisher()); } catch (RegistryException e) { log.error("Error while launching workflow", e); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/8b2a6b06/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java new file mode 100644 index 0000000..ec5acfa --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; +import org.apache.airavata.registry.cpi.RegistryException; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class WorkflowEnactmentService { + + private static WorkflowEnactmentService workflowEnactmentService; + private ExecutorService executor; + + private WorkflowEnactmentService () { + executor = Executors.newFixedThreadPool(getThreadPoolSize()); + } + + public static WorkflowEnactmentService getInstance(){ + if (workflowEnactmentService == null) { + synchronized (WorkflowEnactmentService.class) { + if (workflowEnactmentService == null) { + workflowEnactmentService = new WorkflowEnactmentService(); + } + } + } + return workflowEnactmentService; + } + + public void submitWorkflow(String experimentId, + String credentialToken, + String gatewayName, + RabbitMQProcessPublisher publisher) throws RegistryException { + + SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( + experimentId, credentialToken,gatewayName, publisher); + executor.execute(simpleWorkflowInterpreter); + } + + private int getThreadPoolSize() { + return ServerSettings.getEnactmentThreadPoolSize(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8b2a6b06/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java index b12260d..e8674f2 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java @@ -33,15 +33,13 @@ public class WorkflowFactoryImpl implements WorkflowFactory { private WorkflowParser workflowParser; - private static final String synch = "sync"; - private WorkflowFactoryImpl(){ } public static WorkflowFactoryImpl getInstance() { if (workflowFactoryImpl == null) { - synchronized (synch) { + synchronized (WorkflowFactory.class) { if (workflowFactoryImpl == null) { workflowFactoryImpl = new WorkflowFactoryImpl(); }
