Refactored GfacServerHandler initialization, changed GFac interface , added GfacImpl
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8535ff1d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8535ff1d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8535ff1d Branch: refs/heads/master Commit: 8535ff1d24a228c793075b92cfdc7f08d6ab7bdb Parents: 9500724 Author: Shameera Rathanyaka <[email protected]> Authored: Fri Jun 12 12:05:18 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Fri Jun 12 12:05:18 2015 -0400 ---------------------------------------------------------------------- .../AiravataExperimentStatusUpdator.java | 10 +- .../exception/AiravataStartupException.java | 46 +++++ .../exception/AiravataStratupException.java | 9 - .../apache/airavata/common/utils/Constants.java | 31 +-- .../common/utils/LocalEventPublisher.java | 47 +++++ .../airavata/common/utils/MonitorPublisher.java | 47 ----- .../airavata/common/utils/ServerSettings.java | 41 +++- .../main/resources/airavata-server.properties | 109 +++++------ .../gfac/bes/provider/impl/BESProvider.java | 2 +- .../gfac/bes/security/X509SecurityContext.java | 8 +- .../gfac/bes/utils/DataTransferrer.java | 4 +- .../apache/airavata/gfac/core/Constants.java | 82 -------- .../org/apache/airavata/gfac/core/GFac.java | 36 ++-- .../airavata/gfac/core/GFacConfiguration.java | 34 ++-- .../airavata/gfac/core/GFacConstants.java | 86 +++++++++ .../apache/airavata/gfac/core/GFacUtils.java | 16 +- .../apache/airavata/gfac/core/GFacWorker.java | 37 ++++ .../apache/airavata/gfac/core/RequestData.java | 10 +- .../apache/airavata/gfac/core/Scheduler.java | 22 +-- .../gfac/core/context/JobExecutionContext.java | 12 +- .../gfac/core/context/ProcessContext.java | 89 +++++++++ .../gfac/core/handler/AbstractHandler.java | 6 +- .../security/TokenizedMyProxyAuthInfo.java | 10 +- .../gfac/impl/AiravataJobStatusUpdator.java | 10 +- .../gfac/impl/AiravataTaskStatusUpdator.java | 12 +- .../impl/AiravataWorkflowNodeStatusUpdator.java | 10 +- .../airavata/gfac/impl/BetterGfacImpl.java | 95 +++++----- .../org/apache/airavata/gfac/impl/GFacImpl.java | 28 +++ .../airavata/gfac/impl/OutHandlerWorker.java | 16 +- .../gfac/local/provider/impl/LocalProvider.java | 10 +- .../monitor/core/AiravataAbstractMonitor.java | 3 +- .../gfac/monitor/email/EmailBasedMonitor.java | 2 +- .../handlers/GridPullMonitorHandler.java | 2 +- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 16 +- .../monitor/impl/push/amqp/AMQPMonitor.java | 16 +- .../monitor/impl/push/amqp/BasicConsumer.java | 6 +- .../gfac/ssh/handler/SSHOutputHandler.java | 4 +- .../gfac/ssh/provider/impl/SSHProvider.java | 22 +-- .../gfac/ssh/security/TokenizedSSHAuthInfo.java | 10 +- .../airavata/gfac/ssh/util/GFACSSHUtils.java | 4 +- .../gfac/services/impl/LocalProviderTest.java | 4 +- .../apache/airavata/job/AMQPMonitorTest.java | 10 +- .../job/QstatMonitorTestWithMyProxyAuth.java | 6 +- .../airavata/gfac/server/GfacServerHandler.java | 189 +++++++------------ .../engine/util/ProxyMonitorPublisher.java | 8 +- 45 files changed, 697 insertions(+), 580 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index 5f25f8a..1d1f1ed 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@ -26,7 +26,7 @@ import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.Constants; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.MessageContext; @@ -51,7 +51,7 @@ import java.util.Calendar; public class AiravataExperimentStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class); private ExperimentCatalog airavataExperimentCatalog; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private Publisher publisher; private CuratorFramework curatorClient; private RabbitMQTaskLaunchConsumer consumer; @@ -111,7 +111,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId(), nodeStatus.getWorkflowNodeIdentity().getGatewayId()); - monitorPublisher.publish(event); + localEventPublisher.publish(event); String messageId = AiravataUtils.getId("EXPERIMENT"); MessageContext msgCntxt = new MessageContext(event, MessageType.EXPERIMENT, messageId, nodeStatus.getWorkflowNodeIdentity().getGatewayId()); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); @@ -203,8 +203,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener for (Object configuration : configurations) { if (configuration instanceof ExperimentCatalog){ this.airavataExperimentCatalog =(ExperimentCatalog)configuration; - } else if (configuration instanceof MonitorPublisher){ - this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof LocalEventPublisher){ + this.localEventPublisher =(LocalEventPublisher) configuration; } else if (configuration instanceof Publisher){ this.publisher=(Publisher) configuration; }else if (configuration instanceof RabbitMQTaskLaunchConsumer) { http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java b/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java new file mode 100644 index 0000000..2ec9f5a --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStartupException.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.common.exception; + +public class AiravataStartupException extends Exception { + private static final long serialVersionUID = 495204868100143133L; + + public AiravataStartupException() { + super(); + } + + public AiravataStartupException(String message) { + super(message); + } + + public AiravataStartupException(String message, Throwable cause) { + super(message, cause); + } + + public AiravataStartupException(Throwable cause) { + super(cause); + } + + protected AiravataStartupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java b/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java deleted file mode 100644 index e503f4b..0000000 --- a/modules/commons/src/main/java/org/apache/airavata/common/exception/AiravataStratupException.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.apache.airavata.common.exception; - -/** - * Created by syodage on 6/11/15. - */ -public class AiravataStratupException extends Exception { - private static final long serialVersionUID = 495204868100143133L; - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java index 6855a8e..6e1cb84 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java @@ -27,37 +27,10 @@ package org.apache.airavata.common.utils; public final class Constants { public static final String USER_IN_SESSION = "userName"; -// public static final String GATEWAY_NAME = "gateway_id"; - public static final String GFAC_CONFIG_XML = "gfac-config.xml"; - public static final String PUSH = "push"; - public static final String PULL = "pull"; - public static final String API_SERVER_PORT = "apiserver.server.port"; - public static final String API_SERVER_HOST = "apiserver.server.host"; - public static final String REGISTRY_JDBC_URL = "registry.jdbc.url"; - public static final String APPCATALOG_JDBC_URL = "appcatalog.jdbc.url"; - public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; - public static final String RABBITMQ_EXCHANGE = "rabbitmq.exchange.name"; - public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host"; - public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port"; - public static final String GFAC_SERVER_HOST = "gfac.server.host"; - public static final String GFAC_SERVER_PORT = "gfac.server.port"; - public static final String CREDENTIAL_SERVER_HOST = "credential.store.server.host"; - public static final String CREDENTIAL_SERVER_PORT = "credential.store.server.port"; - public static final String ZOOKEEPER_EXPERIMENT_CATALOG = "experiment-catalog"; - public static final String ZOOKEEPER_APPCATALOG = "app-catalog"; - public static final String ZOOKEEPER_RABBITMQ = "rabbit-mq"; - public static final String ZOOKEEPER_SERVER_HOST = "zookeeper.server.host"; - public static final String ZOOKEEPER_SERVER_PORT = "zookeeper.server.port"; - public static final String ZOOKEEPER_API_SERVER_NODE = "airavata-server"; - public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NODE = "orchestrator-server"; - public static final String ZOOKEEPER_GFAC_SERVER_NODE = "gfac-server"; - public static final String ZOOKEEPER_GFAC_EXPERIMENT_NODE = "gfac-experiments"; - public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name"; - public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name"; - public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name"; + + public static final String STAT = "stat"; public static final String JOB = "job"; - public static final String ZOOKEEPER_TIMEOUT = "zookeeper.timeout"; //API security relates property names public static final String IS_API_SECURED = "api.secured"; public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server"; http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java new file mode 100644 index 0000000..2b5a1d8 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/LocalEventPublisher.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.common.utils; + +import com.google.common.eventbus.EventBus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LocalEventPublisher { + private final static Logger logger = LoggerFactory.getLogger(LocalEventPublisher.class); + private EventBus eventBus; + + public LocalEventPublisher(EventBus eventBus) { + this.eventBus = eventBus; + } + + public void registerListener(Object listener) { + eventBus.register(listener); + } + + public void unregisterListener(Object listener) { + eventBus.unregister(listener); + } + + public void publish(Object o) { + eventBus.post(o); + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java deleted file mode 100644 index 7f64e86..0000000 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.common.utils; - -import com.google.common.eventbus.EventBus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MonitorPublisher{ - private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class); - private EventBus eventBus; - - public MonitorPublisher(EventBus eventBus) { - this.eventBus = eventBus; - } - - public void registerListener(Object listener) { - eventBus.register(listener); - } - - public void unregisterListener(Object listener) { - eventBus.unregister(listener); - } - - public void publish(Object o) { - eventBus.post(o); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 8370e40..3f312fd 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -33,8 +33,24 @@ public class ServerSettings extends ApplicationSettings { private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway"; private static final String SERVER_CONTEXT_ROOT = "server.context-root"; - public static final String EMBEDDED_ZK = "embedded.zk"; public static final String IP = "ip"; + // Orchestrator Constants + public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host"; + + public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port"; + // Gfac constants + public static final String GFAC_SERVER_HOST = "gfac.server.host"; + public static final String GFAC_SERVER_PORT = "gfac.server.port"; + public static final String GFAC_SERVER_NAME = "gfac.server.name"; + public static final String GFAC_CONFIG_XML = "gfac-config.xml"; + // Credential Store constants + public static final String CREDENTIAL_SERVER_HOST = "credential.store.server.host"; + public static final String CREDENTIAL_SERVER_PORT = "credential.store.server.port"; + // Zookeeper + curator constants + public static final String EMBEDDED_ZK = "embedded.zk"; + public static final String ZOOKEEPER_SERVER_CONNECTION = "zookeeper.server.connection"; + public static final String ZOOKEEPER_TIMEOUT = "zookeeper.timeout"; + private static final String CREDENTIAL_STORE_DB_URL = "credential.store.jdbc.url"; private static final String CREDENTIAL_STORE_DB_USER = "credential.store.jdbc.user"; @@ -57,7 +73,6 @@ public class ServerSettings extends ApplicationSettings { public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable"; public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids"; public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags"; - public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled public static final String LAUNCH_QUEUE_NAME = "launch.queue.name"; public static final String CANCEL_QUEUE_NAME = "cancel.queue.name"; @@ -186,11 +201,6 @@ public class ServerSettings extends ApplicationSettings { return getSetting(TASK_LAUNCH_PUBLISHER); } - public static boolean isGFacPassiveMode()throws ApplicationSettingsException { - String setting = getSetting(GFAC_PASSIVE); - return Boolean.parseBoolean(setting); - } - public static boolean isEmbeddedZK() { return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true")); } @@ -267,4 +277,21 @@ public class ServerSettings extends ApplicationSettings { public static String getAdminPassword() throws ApplicationSettingsException { return getSetting(Constants.ADMIN_PASSWORD); } + + public static String getZookeeperConnection() throws ApplicationSettingsException { + return getSetting(ZOOKEEPER_SERVER_CONNECTION, "localhost:2181"); + } + + public static String getGFacServerName() throws ApplicationSettingsException { + return getSetting(GFAC_SERVER_NAME); + } + + public static String getGfacServerHost() throws ApplicationSettingsException { + return getSetting(GFAC_SERVER_HOST); + } + + public static String getGFacServerPort() throws ApplicationSettingsException { + return getSetting(GFAC_SERVER_PORT); + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index b5c29a5..9d20609 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -68,20 +68,60 @@ appcatalog.validationQuery=SELECT 1 from CONFIGURATION ########################################################################### # Server module Configuration ########################################################################### - servers=apiserver,orchestrator,gfac,credentialstore #shutdown.trategy=NONE shutdown.trategy=SELF_TERMINATE -apiserver.server.host=localhost -apiserver.server.port=8930 -apiserver.server.min.threads=50 +########################################################################### +# API Server Configurations +########################################################################### +apiserver=org.apache.airavata.api.server.AiravataAPIServer +apiserver.name=apiserver-node0 +apiserver.host=localhost +apiserver.port=8930 +apiserver.min.threads=50 + + +########################################################################### +# Orchestrator Server Configurations +########################################################################### +orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer +orchestrator.server.name=orchestrator-node0 orchestrator.server.host=localhost orchestrator.server.port=8940 +orchestrator.server.min.threads=50 +#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter +#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter +#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter +job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator +submitter.interval=10000 +threadpool.size=10 +start.submitter=true +embedded.mode=true +enable.validation=true + + +########################################################################### +# GFac Server Configurations +########################################################################### +gfac.server.name=gfac-node0 gfac.server.host=localhost gfac.server.port=8950 -orchestrator.server.min.threads=50 +host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler + + + +########################################################################### +# Airavata Workflow Interpreter Configurations +########################################################################### +workflowserver=org.apache.airavata.api.server.WorkflowServer +enactment.thread.pool.size=10 + +#to define custom workflow parser user following property +#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowParser + + ########################################################################### # Job Scheduler can send informative email messages to you about the status of your job. @@ -139,7 +179,6 @@ myproxy.password= myproxy.life=3600 # XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates -gfac.passive=true # SSH PKI key pair or ssh password can be used SSH based authentication is used. # if user specify both password authentication gets the higher preference @@ -156,33 +195,6 @@ gfac.passive=true #bes.ca.key.path=<location>/certificates/cakey.pem #bes.ca.key.pass=passphrase - -########################################################################### -# Airavata Workflow Interpreter Configurations -########################################################################### - -#runInThread=true -#provenance=true -#provenanceWriterThreadPoolSize=20 -#gfac.embedded=true -#workflowserver=org.apache.airavata.api.server.WorkflowServer -enactment.thread.pool.size=10 - -#to define custom workflow parser user following property -#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowParser - - -########################################################################### -# API Server module Configuration -########################################################################### -apiserver=org.apache.airavata.api.server.AiravataAPIServer - -########################################################################### -# Workflow Server module Configuration -########################################################################### - -workflowserver=org.apache.airavata.api.server.WorkflowServer - ########################################################################### # Advance configuration to change service implementations ########################################################################### @@ -191,7 +203,6 @@ TwoPhase=true # # Class which implemented HostScheduler interface. It will determine the which host to submit the request # -host.scheduler=org.apache.airavata.gfac.core.scheduler.impl.SimpleHostScheduler ########################################################################### # Monitoring module Configuration @@ -213,15 +224,11 @@ email.based.monitoring.period=10000 ########################################################################### # AMQP Notification Configuration ########################################################################### - - amqp.notification.enable=1 - amqp.broker.host=localhost amqp.broker.port=5672 amqp.broker.username=guest amqp.broker.password=guest - amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl @@ -251,35 +258,11 @@ activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher rabbitmq.exchange.name=airavata_rabbitmq_exchange ########################################################################### -# Orchestrator module Configuration -########################################################################### - -#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter -#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter -#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter -job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator -submitter.interval=10000 -threadpool.size=10 -start.submitter=true -embedded.mode=true -enable.validation=true -orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer - -########################################################################### # Zookeeper Server Configuration ########################################################################### - embedded.zk=true -zookeeper.server.host=localhost -zookeeper.server.port=2181 -airavata-server=/api-server +zookeeper.server.connection=localhost:2181 zookeeper.timeout=30000 -orchestrator-server=/orchestrator-server -gfac-server=/gfac-server -gfac-experiments=/gfac-experiments -gfac-server-name=gfac-node0 -orchestrator-server-name=orch-node0 -airavata-server-name=api-node0 ######################################################################## ## API Security Configuration http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index b166593..e629150 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -449,6 +449,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider, log.debug(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), jobStatus.getJobIdentity().getTaskId()); - jobExecutionContext.getMonitorPublisher().publish(jobStatus); + jobExecutionContext.getLocalEventPublisher().publish(jobStatus); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java index ff60e99..d9b183f 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/X509SecurityContext.java @@ -26,8 +26,8 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.credential.store.credential.Credential; import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.context.AbstractSecurityContext; -import org.apache.airavata.gfac.core.Constants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.RequestData; import org.apache.airavata.gfac.bes.utils.MyProxyLogon; @@ -105,13 +105,13 @@ public class X509SecurityContext extends AbstractSecurityContext { log.info("Current directory " + f.getAbsolutePath()); throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath); } else { - System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath()); + System.setProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath()); } } private static void setUpTrustedCertificatePath() throws ApplicationSettingsException { - String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION); + String trustedCertificatePath = ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION); setUpTrustedCertificatePath(trustedCertificatePath); } @@ -122,7 +122,7 @@ public class X509SecurityContext extends AbstractSecurityContext { * @return The trusted certificate path as a string. */ public static String getTrustedCertificatePath() { - return System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY); + return System.getProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY); } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java index 623a0b6..f20f18b 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; @@ -209,7 +209,7 @@ public class DataTransferrer { String temp = null; while ((temp = instream.readLine()) != null) { buff.append(temp); - buff.append(Constants.NEWLINE); + buff.append(GFacConstants.NEWLINE); } log.info("finish read file:" + localFile); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java deleted file mode 100644 index 9f89256..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Constants.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.gfac.core; - -public class Constants { - public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler"; - public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler"; - public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler"; - - public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='"; - public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler"; - public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler"; - public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler"; - - - public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='"; - public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='"; - public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='"; - public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler"; - public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler"; - - public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class"; - public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security"; - public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission"; - public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode"; - public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class"; - public static final String NEWLINE = System.getProperty("line.separator"); - public static final String INPUT_DATA_DIR_VAR_NAME = "input"; - public static final String OUTPUT_DATA_DIR_VAR_NAME = "output"; - public static final int DEFAULT_GSI_FTP_PORT = 2811; - public static final String _127_0_0_1 = "127.0.0.1"; - public static final String LOCALHOST = "localhost"; - - public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id"; - public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id"; - public static final String PROP_BROKER_URL = "broker.url"; - public static final String PROP_TOPIC = "topic"; - public static final String SPACE = " "; - public static final int COMMAND_EXECUTION_TIMEOUT = 5; - public static final String EXECUTABLE_NAME = "run.sh"; - - public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location"; - public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR"; - public static final String MYPROXY_SERVER = "myproxy.server"; - public static final String MYPROXY_SERVER_PORT = "myproxy.port"; - public static final String MYPROXY_USER = "myproxy.username"; - public static final String MYPROXY_PASS = "myproxy.password"; - public static final String MYPROXY_LIFE = "myproxy.life"; - /* - * SSH properties - */ - public static final String SSH_PRIVATE_KEY = "private.ssh.key"; - public static final String SSH_PUBLIC_KEY = "public.ssh.key"; - public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass"; - public static final String SSH_USER_NAME = "ssh.username"; - public static final String SSH_PASSWORD = "ssh.password"; - public static final String PROPERTY = "property"; - public static final String NAME = "name"; - public static final String VALUE = "value"; - public static final String OUTPUT_DATA_DIR = "output.location"; - - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java index a8c0d58..fca4c98 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFac.java @@ -20,8 +20,9 @@ */ package org.apache.airavata.gfac.core; +import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.registry.cpi.AppCatalog; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.registry.cpi.ExperimentCatalog; import org.apache.curator.framework.CuratorFramework; @@ -33,44 +34,33 @@ import org.apache.curator.framework.CuratorFramework; public interface GFac { /** - * Initialized method, this method must call one time before use any other method. - * @param experimentCatalog - * @param appCatalog - * @param curatorClient - * @param publisher - * @return - */ - public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient, MonitorPublisher publisher); - - /** - * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers - * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry + * Launching a process, this method run process inflow task and job submission task. * - * @param experimentID + * @param processContext * @return boolean Successful acceptence of the jobExecution returns a true value * @throws GFacException */ - public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException; + public boolean submitProcess(ProcessContext processContext) throws GFacException; /** - * This method can be used in a handler to ivvoke outhandler asynchronously - * @param jobExecutionContext + * This will invoke outflow tasks for a given process. + * @param processContext * @throws GFacException */ - public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; + public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException; /** - * This method can be used to handle re-run case asynchronously - * @param jobExecutionContext + * This will reInvoke outflow tasks for a given process. + * @param processContext * @throws GFacException */ - public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; + public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException; /** - * This operation can be used to cancel an already running experiment + * This operation can be used to cancel an already running process. * @return Successful cancellation will return true * @throws GFacException */ - public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException; + public boolean cancelProcess(ProcessContext processContext)throws GFacException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java index ae82a72..3d2a320 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConfiguration.java @@ -94,15 +94,15 @@ public class GFacConfiguration { public void setInHandlers(String providerName, String applicationName) { try { - this.inHandlers = getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_GLOBAL_INFLOW_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + this.inHandlers = getHandlerConfig(handlerDoc, GFacConstants.XPATH_EXPR_GLOBAL_INFLOW_HANDLERS, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); if (applicationName != null) { - String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END; - List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + String xPath = GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + GFacConstants.XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END; + List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); this.inHandlers.addAll(handlers); } if (providerName != null) { - String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END; - List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); + String xPath = GFacConstants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + GFacConstants.XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END; + List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); this.inHandlers.addAll(handlers); } } catch (XPathExpressionException e) { @@ -112,15 +112,15 @@ public class GFacConfiguration { public void setOutHandlers(String providerName, String applicationName) { try { - this.outHandlers = getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + this.outHandlers = getHandlerConfig(handlerDoc, GFacConstants.XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); if (applicationName != null) { - String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END; - List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + String xPath = GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + GFacConstants.XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END; + List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); this.outHandlers.addAll(handlers); } if(providerName != null) { - String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END; - List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + String xPath = GFacConstants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + GFacConstants.XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END; + List<GFacHandlerConfig> handlers = getHandlerConfig(handlerDoc, xPath, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); this.outHandlers.addAll(handlers); } } catch (XPathExpressionException e) { @@ -200,9 +200,9 @@ public class GFacConfiguration { className = ((Element) nl.item(i)).getAttribute(attribute); NodeList childNodes = (nl.item(i)).getChildNodes(); for(int j = 0;j < childNodes.getLength();j++){ - if(Constants.PROPERTY.equals(childNodes.item(j).getNodeName())) { - String name = ((Element) childNodes.item(j)).getAttribute(Constants.NAME); - String value = ((Element) childNodes.item(j)).getAttribute(Constants.VALUE); + if(GFacConstants.PROPERTY.equals(childNodes.item(j).getNodeName())) { + String name = ((Element) childNodes.item(j)).getAttribute(GFacConstants.NAME); + String value = ((Element) childNodes.item(j)).getAttribute(GFacConstants.VALUE); properties.put(name, value); } } @@ -226,9 +226,9 @@ public class GFacConfiguration { if (className != null && !className.equals("")) { NodeList childNodes = (nl.item(i)).getChildNodes(); for (int j = 0; j < childNodes.getLength(); j++) { - if (Constants.PROPERTY.equals(childNodes.item(j).getNodeName())) { - String name = ((Element) childNodes.item(j)).getAttribute(Constants.NAME); - String value = ((Element) childNodes.item(j)).getAttribute(Constants.VALUE); + if (GFacConstants.PROPERTY.equals(childNodes.item(j).getNodeName())) { + String name = ((Element) childNodes.item(j)).getAttribute(GFacConstants.NAME); + String value = ((Element) childNodes.item(j)).getAttribute(GFacConstants.VALUE); properties.put(name, value); } } @@ -273,7 +273,7 @@ public class GFacConfiguration { DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); handlerDoc = docBuilder.parse(configFile); - return getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_DAEMON_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + return getHandlerConfig(handlerDoc, GFacConstants.XPATH_EXPR_DAEMON_HANDLERS, GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); } public static Document getHandlerDoc() { return handlerDoc; http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java new file mode 100644 index 0000000..621eeac --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.core; + +public class GFacConstants { + public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler"; + public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler"; + public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler"; + + public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='"; + public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler"; + public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler"; + public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler"; + + + public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='"; + public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='"; + public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='"; + public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler"; + public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler"; + + public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class"; + public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security"; + public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission"; + public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode"; + public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class"; + public static final String NEWLINE = System.getProperty("line.separator"); + public static final String INPUT_DATA_DIR_VAR_NAME = "input"; + public static final String OUTPUT_DATA_DIR_VAR_NAME = "output"; + public static final int DEFAULT_GSI_FTP_PORT = 2811; + public static final String _127_0_0_1 = "127.0.0.1"; + public static final String LOCALHOST = "localhost"; + + public static final String ZOOKEEPER_SERVERS_NODE = "/servers"; + public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac"; + public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments"; + + public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id"; + public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id"; + public static final String PROP_BROKER_URL = "broker.url"; + public static final String PROP_TOPIC = "topic"; + public static final String SPACE = " "; + public static final int COMMAND_EXECUTION_TIMEOUT = 5; + public static final String EXECUTABLE_NAME = "run.sh"; + + public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location"; + public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR"; + public static final String MYPROXY_SERVER = "myproxy.server"; + public static final String MYPROXY_SERVER_PORT = "myproxy.port"; + public static final String MYPROXY_USER = "myproxy.username"; + public static final String MYPROXY_PASS = "myproxy.password"; + public static final String MYPROXY_LIFE = "myproxy.life"; + /* + * SSH properties + */ + public static final String SSH_PRIVATE_KEY = "private.ssh.key"; + public static final String SSH_PUBLIC_KEY = "public.ssh.key"; + public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass"; + public static final String SSH_USER_NAME = "ssh.username"; + public static final String SSH_PASSWORD = "ssh.password"; + public static final String PROPERTY = "property"; + public static final String NAME = "name"; + public static final String VALUE = "value"; + public static final String OUTPUT_DATA_DIR = "output.location"; + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 65c305b..5a6d51d 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -25,7 +25,7 @@ import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.DBUtil; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; @@ -196,7 +196,7 @@ public class GFacUtils { String temp = null; while ((temp = instream.readLine()) != null) { buff.append(temp); - buff.append(Constants.NEWLINE); + buff.append(GFacConstants.NEWLINE); } return buff.toString(); } finally { @@ -214,7 +214,7 @@ public class GFacUtils { throws UnknownHostException { String localHost = InetAddress.getLocalHost().getCanonicalHostName(); return (localHost.equals(appHost) - || Constants.LOCALHOST.equals(appHost) || Constants._127_0_0_1 + || GFacConstants.LOCALHOST.equals(appHost) || GFacConstants._127_0_0_1 .equals(appHost)); } @@ -267,7 +267,7 @@ public class GFacUtils { jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier); - jobExecutionContext.getMonitorPublisher().publish(jobStatusChangeRequestEvent); + jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent); } catch (Exception e) { throw new GFacException("Error persisting job status" + e.getLocalizedMessage(), e); @@ -510,7 +510,7 @@ public class GFacUtils { * @throws InterruptedException */ public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception { - String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE; List<String> children = curatorClient.getChildren().forPath(experimentNode); for (String pickedChild : children) { String experimentPath = experimentNode + File.separator + pickedChild; @@ -729,11 +729,15 @@ public class GFacUtils { return false; } - public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){ + public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskState state){ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), jobExecutionContext.getGatewayID()); publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity)); } + + public static String getZKGfacServersParentPath() { + return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java new file mode 100644 index 0000000..2219f3a --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacWorker.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.core; + +import org.apache.airavata.gfac.core.context.ProcessContext; + +public class GFacWorker implements Runnable { + + + public GFacWorker(ProcessContext processContext) { + + } + + @Override + public void run() { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java index 73538b0..a396861 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/RequestData.java @@ -90,7 +90,7 @@ public class RequestData { public String getMyProxyServerUrl() throws ApplicationSettingsException { if (myProxyServerUrl == null) { - myProxyServerUrl = ServerSettings.getSetting(Constants.MYPROXY_SERVER); + myProxyServerUrl = ServerSettings.getSetting(GFacConstants.MYPROXY_SERVER); } return myProxyServerUrl; } @@ -102,7 +102,7 @@ public class RequestData { public int getMyProxyPort() { if (myProxyPort == 0) { - String sPort = ServerSettings.getSetting(Constants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT)); + String sPort = ServerSettings.getSetting(GFacConstants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT)); myProxyPort = Integer.parseInt(sPort); } @@ -115,7 +115,7 @@ public class RequestData { public String getMyProxyUserName() throws ApplicationSettingsException { if (myProxyUserName == null) { - myProxyUserName = ServerSettings.getSetting(Constants.MYPROXY_USER); + myProxyUserName = ServerSettings.getSetting(GFacConstants.MYPROXY_USER); } return myProxyUserName; @@ -128,14 +128,14 @@ public class RequestData { public String getMyProxyPassword() throws ApplicationSettingsException { if (myProxyPassword == null) { - myProxyPassword = ServerSettings.getSetting(Constants.MYPROXY_PASS); + myProxyPassword = ServerSettings.getSetting(GFacConstants.MYPROXY_PASS); } return myProxyPassword; } public int getMyProxyLifeTime() { - String life = ServerSettings.getSetting(Constants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime)); + String life = ServerSettings.getSetting(GFacConstants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime)); myProxyLifeTime = Integer.parseInt(life); return myProxyLifeTime; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java index dc5ede8..606c385 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/Scheduler.java @@ -101,7 +101,7 @@ public class Scheduler { String providerClassName = null; try { aClass = GFacConfiguration.getProviderConfig(handlerDoc, - Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); + GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", GFacConstants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); // This should be have a single element only. if (aClass != null && !aClass.isEmpty()) { s = aClass.get(0); @@ -136,13 +136,13 @@ public class Scheduler { unicoreSubmission = appCatalog.getComputeResource().getUNICOREJobSubmission(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId()); securityProtocol = unicoreSubmission.getSecurityProtocol().toString(); } - List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']"); + List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']"); for (Element element : elements) { - String security = element.getAttribute(Constants.GFAC_CONFIG_SECURITY_ATTRIBUTE); + String security = element.getAttribute(GFacConstants.GFAC_CONFIG_SECURITY_ATTRIBUTE); if (security.equals("")) { - providerClassName = element.getAttribute(Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + providerClassName = element.getAttribute(GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); }else if (securityProtocol != null && securityProtocol.equals(security)) { - providerClassName = element.getAttribute(Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); + providerClassName = element.getAttribute(GFacConstants.GFAC_CONFIG_CLASS_ATTRIBUTE); } } if (providerClassName == null) { @@ -152,8 +152,8 @@ public class Scheduler { Class<? extends GFacProvider> aClass1 = Class.forName(providerClassName).asSubclass(GFacProvider.class); provider = aClass1.newInstance(); //loading the provider properties - aClass = GFacConfiguration.getProviderConfig(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + - providerClassName + "']", Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); + aClass = GFacConfiguration.getProviderConfig(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_HANDLERS_START + + providerClassName + "']", GFacConstants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); if (!aClass.isEmpty()) { provider.initProperties(aClass.get(0).getProperties()); } @@ -200,18 +200,18 @@ public class Scheduler { String executionMode = "sync"; try { executionMode = GFacConfiguration.getAttributeValue(handlerDoc, - Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); + GFacConstants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", GFacConstants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); // This should be have a single element only. if (executionMode == null || "".equals(executionMode)) { String hostClass = jobExecutionContext.getPreferredJobSubmissionProtocol().toString(); - executionMode = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); + executionMode = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", GFacConstants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); } if (executionMode == null || "".equals(executionMode)) { - List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']"); + List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), GFacConstants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']"); for (Element element : elements) { - executionMode = element.getAttribute(Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); + executionMode = element.getAttribute(GFacConstants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index 8183dec..b240901 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -27,10 +27,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.AppCatalogException; -import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.gfac.core.GFacConfiguration; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.SecurityContext; @@ -144,7 +144,7 @@ public class JobExecutionContext extends AbstractContext implements Serializable private String status; private List<String> outputFileList; private ExperimentCatalog experimentCatalog; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; public String getGatewayID() { return gatewayID; @@ -486,11 +486,11 @@ public class JobExecutionContext extends AbstractContext implements Serializable this.loginUserName = loginUserName; } - public MonitorPublisher getMonitorPublisher() { - return monitorPublisher; + public LocalEventPublisher getLocalEventPublisher() { + return localEventPublisher; } - public void setMonitorPublisher(MonitorPublisher monitorPublisher) { - this.monitorPublisher = monitorPublisher; + public void setLocalEventPublisher(LocalEventPublisher localEventPublisher) { + this.localEventPublisher = localEventPublisher; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java new file mode 100644 index 0000000..4cab291 --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.core.context; + +import org.apache.airavata.common.utils.LocalEventPublisher; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.curator.framework.CuratorFramework; + +public class ProcessContext { + // process model + private ExperimentCatalog experimentCatalog; + private AppCatalog appCatalog; + private CuratorFramework curatorClient; + private LocalEventPublisher localEventPublisher; + private final String processId; + private final String gatewayId; + private final String tokenId; + + public ProcessContext(String processId, String gatewayId, String tokenId) { + this.processId = processId; + this.gatewayId = gatewayId; + this.tokenId = tokenId; + } + + // Getters and Setters + public ExperimentCatalog getExperimentCatalog() { + return experimentCatalog; + } + + public void setExperimentCatalog(ExperimentCatalog experimentCatalog) { + this.experimentCatalog = experimentCatalog; + } + + public AppCatalog getAppCatalog() { + return appCatalog; + } + + public void setAppCatalog(AppCatalog appCatalog) { + this.appCatalog = appCatalog; + } + + public String getGatewayId() { + return gatewayId; + } + + public String getTokenId() { + return tokenId; + } + + public String getProcessId() { + return processId; + } + + public CuratorFramework getCuratorClient() { + return curatorClient; + } + + public void setCuratorClient(CuratorFramework curatorClient) { + this.curatorClient = curatorClient; + } + + public LocalEventPublisher getLocalEventPublisher() { + return localEventPublisher; + } + + public void setLocalEventPublisher(LocalEventPublisher localEventPublisher) { + this.localEventPublisher = localEventPublisher; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java index fcfd7f1..6a28986 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java @@ -20,7 +20,7 @@ */ package org.apache.airavata.gfac.core.handler; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.states.GfacHandlerState; import org.apache.airavata.gfac.core.GFacUtils; @@ -39,11 +39,11 @@ public abstract class AbstractHandler implements GFacHandler { private static final Logger logger = LoggerFactory.getLogger(AbstractHandler.class); protected ExperimentCatalog experimentCatalog = null; - protected MonitorPublisher publisher = null; + protected LocalEventPublisher publisher = null; public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { try { - publisher = jobExecutionContext.getMonitorPublisher(); + publisher = jobExecutionContext.getLocalEventPublisher(); GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED); } catch (Exception e) { logger.error("Error saving Recoverable provider state", e); http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java index 5b1e3a2..36cb84f 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java @@ -25,7 +25,7 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.credential.store.credential.Credential; import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; import org.apache.airavata.credential.store.store.CredentialReader; -import org.apache.airavata.gfac.core.Constants; +import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.RequestData; import org.apache.airavata.gfac.core.GFacUtils; @@ -79,13 +79,13 @@ public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo { log.info("Current directory " + f.getAbsolutePath()); throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath); } else { - System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath()); + System.setProperty(GFacConstants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath()); } } private static void setUpTrustedCertificatePath() throws ApplicationSettingsException { - String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION); + String trustedCertificatePath = ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION); setUpTrustedCertificatePath(trustedCertificatePath); } @@ -94,7 +94,7 @@ public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo { this.credentialReader = credentialReader; this.requestData = requestData; try { - properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION)); + properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION)); } catch (ApplicationSettingsException e) { log.error("Error while reading server properties", e); }; @@ -103,7 +103,7 @@ public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo { public TokenizedMyProxyAuthInfo(RequestData requestData) { this.requestData = requestData; try { - properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION)); + properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(GFacConstants.TRUSTED_CERT_LOCATION)); } catch (ApplicationSettingsException e) { log.error("Error while reading server properties", e); }; http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java index 46659ff..eee66c2 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java @@ -22,7 +22,7 @@ package org.apache.airavata.gfac.impl; import com.google.common.eventbus.Subscribe; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.Publisher; @@ -43,7 +43,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class); private ExperimentCatalog airavataExperimentCatalog; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private Publisher publisher; @@ -71,7 +71,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { logger.debug("expId - {}: Publishing job status for " + jobStatus.getJobIdentity().getJobId() + ":" + state.toString(),jobStatus.getJobIdentity().getExperimentId()); JobStatusChangeEvent event = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()); - monitorPublisher.publish(event); + localEventPublisher.publish(event); String messageId = AiravataUtils.getId("JOB"); MessageContext msgCntxt = new MessageContext(event, MessageType.JOB, messageId, jobStatus.getJobIdentity().getGatewayId()); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); @@ -110,8 +110,8 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { for (Object configuration : configurations) { if (configuration instanceof ExperimentCatalog){ this.airavataExperimentCatalog =(ExperimentCatalog)configuration; - } else if (configuration instanceof MonitorPublisher){ - this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof LocalEventPublisher){ + this.localEventPublisher =(LocalEventPublisher) configuration; } else if (configuration instanceof Publisher){ this.publisher=(Publisher) configuration; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java index 7cfa7ca..e64952c 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java @@ -23,7 +23,7 @@ package org.apache.airavata.gfac.impl; import com.google.common.eventbus.Subscribe; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.Publisher; @@ -45,7 +45,7 @@ import java.util.Calendar; public class AiravataTaskStatusUpdator implements AbstractActivityListener { private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class); private ExperimentCatalog airavataExperimentCatalog; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private Publisher publisher; public ExperimentCatalog getAiravataExperimentCatalog() { @@ -63,7 +63,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { logger.debug("expId - {}: Publishing task status for " + taskStatus.getTaskIdentity().getTaskId() + ":" + taskStatus.getState().toString(), taskStatus.getTaskIdentity().getExperimentId()); TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity()); - monitorPublisher.publish(event); + localEventPublisher.publish(event); String messageId = AiravataUtils.getId("TASK"); MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId, taskStatus.getTaskIdentity().getGatewayId()); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); @@ -107,7 +107,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { jobStatus.getJobIdentity().getExperimentId(), jobStatus.getJobIdentity().getGatewayId()); TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity); - monitorPublisher.publish(event); + localEventPublisher.publish(event); String messageId = AiravataUtils.getId("TASK"); MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId()); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); @@ -144,8 +144,8 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { for (Object configuration : configurations) { if (configuration instanceof ExperimentCatalog){ this.airavataExperimentCatalog =(ExperimentCatalog)configuration; - } else if (configuration instanceof MonitorPublisher){ - this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof LocalEventPublisher){ + this.localEventPublisher =(LocalEventPublisher) configuration; } else if (configuration instanceof Publisher){ this.publisher=(Publisher) configuration; } http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java index ddec551..53c29d1 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java @@ -22,7 +22,7 @@ package org.apache.airavata.gfac.impl; import com.google.common.eventbus.Subscribe; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.Publisher; @@ -44,7 +44,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class); private ExperimentCatalog airavataExperimentCatalog; - private MonitorPublisher monitorPublisher; + private LocalEventPublisher localEventPublisher; private Publisher publisher; @@ -88,7 +88,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen taskStatus.getTaskIdentity().getExperimentId(), taskStatus.getTaskIdentity().getGatewayId()); WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity); - monitorPublisher.publish(event); + localEventPublisher.publish(event); String messageId = AiravataUtils.getId("WFNODE"); MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId, taskStatus.getTaskIdentity().getGatewayId()); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); @@ -119,8 +119,8 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen for (Object configuration : configurations) { if (configuration instanceof ExperimentCatalog){ this.airavataExperimentCatalog =(ExperimentCatalog)configuration; - } else if (configuration instanceof MonitorPublisher){ - this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof LocalEventPublisher){ + this.localEventPublisher =(LocalEventPublisher) configuration; } else if (configuration instanceof Publisher){ this.publisher=(Publisher) configuration; }
