http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java new file mode 100644 index 0000000..14fd7fe --- /dev/null +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java @@ -0,0 +1,55 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.9.1) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.airavata.gfac.cpi; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class gfac_cpi_serviceConstants { + + public static final String GFAC_CPI_VERSION = "0.13.0"; + +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java new file mode 100644 index 0000000..b076145 --- /dev/null +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.server; + +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.IServer; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.cpi.GfacService; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +public class GfacServer implements IServer{ + + private final static Logger logger = LoggerFactory.getLogger(GfacServer.class); + private static final String SERVER_NAME = "Gfac Server"; + private static final String SERVER_VERSION = "1.0"; + + private IServer.ServerStatus status; + + private TServer server; + + public GfacServer() { + setStatus(IServer.ServerStatus.STOPPED); + } + + public void StartGfacServer(GfacService.Processor<GfacServerHandler> gfacServerHandlerProcessor) + throws Exception { + try { + final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950")); + final String serverHost = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST, null); + + InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort); + + TServerTransport serverTransport = new TServerSocket(inetSocketAddress); + + server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(gfacServerHandlerProcessor)); + + new Thread() { + public void run() { + server.serve(); + setStatus(IServer.ServerStatus.STOPPED); + logger.info("Gfac Server Stopped."); + } + }.start(); + new Thread() { + public void run() { + while(!server.isServing()){ + try { + Thread.sleep(500); + } catch (InterruptedException e) { + break; + } + } + if (server.isServing()){ + setStatus(IServer.ServerStatus.STARTED); + logger.info("Starting Gfac Server on Port " + serverPort); + logger.info("Listening to Gfac Clients ...."); + } + } + }.start(); + } catch (TTransportException e) { + logger.error(e.getMessage()); + setStatus(IServer.ServerStatus.FAILED); + } + } + + public static void main(String[] args) { + try { + new GfacServer().start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + public void start() throws Exception { + setStatus(IServer.ServerStatus.STARTING); + GfacService.Processor<GfacServerHandler> gfacService = + new GfacService.Processor<GfacServerHandler>(new GfacServerHandler()); + StartGfacServer(gfacService); + } + + public void stop() throws Exception { + if (server!=null && server.isServing()){ + setStatus(IServer.ServerStatus.STOPING); + server.stop(); + } + GFacThreadPoolExecutor.getCachedThreadPool().shutdownNow(); + + } + + public void restart() throws Exception { + stop(); + start(); + } + + public void configure() throws Exception { + // TODO Auto-generated method stub + + } + + public IServer.ServerStatus getStatus() throws Exception { + return status; + } + + private void setStatus(IServer.ServerStatus stat){ + status=stat; + status.updateTime(); + } + + public String getName() { + return SERVER_NAME; + } + + public String getVersion() { + return SERVER_VERSION; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java new file mode 100644 index 0000000..77a89cc --- /dev/null +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -0,0 +1,421 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.server; + +import com.google.common.eventbus.EventBus; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.logger.AiravataLogger; +import org.apache.airavata.common.logger.AiravataLoggerFactory; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.gfac.GFacConfiguration; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; +import org.apache.airavata.gfac.core.GFac; +import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.handler.ThreadedHandler; +import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.utils.InputHandlerWorker; +import org.apache.airavata.gfac.cpi.GfacService; +import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.PublisherFactory; +import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskSubmitEvent; +import org.apache.airavata.model.messaging.event.TaskTerminateEvent; +import org.apache.airavata.model.workspace.experiment.ExperimentState; +import org.apache.airavata.model.workspace.experiment.ExperimentStatus; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; +import org.xml.sax.SAXException; + +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPathExpressionException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +public class GfacServerHandler implements GfacService.Iface { + private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class); + private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + private static int requestCount=0; + private Registry registry; + private AppCatalog appCatalog; + private String gatewayName; + private String airavataUserName; + private CuratorFramework curatorClient; + private MonitorPublisher publisher; + private String gfacServer; + private String gfacExperiments; + private String airavataServerHostPort; + private BlockingQueue<TaskSubmitEvent> taskSubmitEvents; + private static File gfacConfigFile; + private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>(); + private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>(); + + public GfacServerHandler() throws Exception { + try { + // start curator client + String zkhostPort = AiravataZKUtils.getZKhostPort(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); + curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy); + curatorClient.start(); + gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST) + + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT); + storeServerConfig(); + publisher = new MonitorPublisher(new EventBus()); + registry = RegistryFactory.getDefaultRegistry(); + appCatalog = AppCatalogFactory.getAppCatalog(); + setGatewayProperties(); + startDaemonHandlers(); + // initializing Better Gfac Instance + BetterGfacImpl.getInstance().init(registry, appCatalog, curatorClient, publisher); + if (ServerSettings.isGFacPassiveMode()) { + rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); + } + startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer); + + } catch (Exception e) { + throw new Exception("Error initialising GFAC", e); + } + } + + public static void main(String[] args) { + RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null; + try { + rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + rabbitMQTaskLaunchConsumer.listen(new TestHandler()); + } catch (AiravataException e) { + logger.error(e.getMessage(), e); + } + } + private void storeServerConfig() throws Exception { + Stat stat = curatorClient.checkExists().forPath(gfacServer); + if (stat == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath(gfacServer, new byte[0]); + } + String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); + String instanceNode = gfacServer + File.separator + instanceId; + stat = curatorClient.checkExists().forPath(instanceNode); + if (stat == null) { + curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes()); + curatorClient.getChildren().watched().forPath(instanceNode); + } + stat = curatorClient.checkExists().forPath(gfacExperiments); + if (stat == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes()); + } + stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId); + if (stat == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes()); + } + } + + private long ByateArrayToLong(byte[] data) { + long value = 0; + for (int i = 0; i < data.length; i++) + { + value += ((long) data[i] & 0xffL) << (8 * i); + } + return value; + } + + public String getGFACServiceVersion() throws TException { + return gfac_cpi_serviceConstants.GFAC_CPI_VERSION; + } + + /** + * * After creating the experiment Data and Task Data in the orchestrator + * * Orchestrator has to invoke this operation for each Task per experiment to run + * * the actual Job related actions. + * * + * * @param experimentID + * * @param taskID + * * @param gatewayId: + * * The GatewayId is inferred from security context and passed onto gfac. + * * @return sucess/failure + * * + * * + * + * @param experimentId + * @param taskId + * @param gatewayId + */ + public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { + requestCount++; + logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------"); + logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId); + InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId, + taskId, gatewayId, tokenId); +// try { +// if( gfac.submitJob(experimentId, taskId, gatewayId)){ + logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " + + "{}", experimentId, taskId, gatewayId); + + GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker); + + // we immediately return when we have a threadpool + return true; + } + + public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { + logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); + try { + if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) { + logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId); + return true; + } else { + logger.errorId(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId); + return false; + } + } catch (Exception e) { + logger.errorId(experimentId, "Error cancelling the experiment {}.", experimentId); + throw new TException("Error cancelling the experiment : " + e.getMessage(), e); + } + } + + public Registry getRegistry() { + return registry; + } + + public void setRegistry(Registry registry) { + this.registry = registry; + } + + public String getGatewayName() { + return gatewayName; + } + + public void setGatewayName(String gatewayName) { + this.gatewayName = gatewayName; + } + + public String getAiravataUserName() { + return airavataUserName; + } + + public void setAiravataUserName(String airavataUserName) { + this.airavataUserName = airavataUserName; + } + + protected void setGatewayProperties() throws ApplicationSettingsException { + setAiravataUserName(ServerSettings.getDefaultUser()); + setGatewayName(ServerSettings.getDefaultUserGateway()); + } + + private GFac getGfac() throws TException { + GFac gFac = BetterGfacImpl.getInstance(); + gFac.init(registry, appCatalog, curatorClient, publisher); + return gFac; + } + + public void startDaemonHandlers() { + List<GFacHandlerConfig> daemonHandlerConfig = null; + String className = null; + try { + URL resource = GfacServerHandler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); + if (resource != null) { + gfacConfigFile = new File(resource.getPath()); + } + daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile); + for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) { + className = handlerConfig.getClassName(); + Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class); + ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance(); + threadedHandler.initProperties(handlerConfig.getProperties()); + daemonHandlers.add(threadedHandler); + } + } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException | + InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) { + logger.error("Error parsing gfac-config.xml, double check the xml configuration", e); + } + for (ThreadedHandler tHandler : daemonHandlers) { + (new Thread(tHandler)).start(); + } + } + + + public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher, + + RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) { + try { + String[] listenerClassList = ServerSettings.getActivityListeners(); + Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher(); + for (String listenerClass : listenerClassList) { + Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); + AbstractActivityListener abstractActivityListener = aClass.newInstance(); + activityListeners.add(abstractActivityListener); + abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer); + logger.info("Registering listener: " + listenerClass); + publisher.registerListener(abstractActivityListener); + } + } catch (Exception e) { + logger.error("Error loading the listener classes configured in airavata-server.properties", e); + } + } + private static class TestHandler implements MessageHandler{ + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + ArrayList<String> keys = new ArrayList<String>(); + keys.add(ServerSettings.getLaunchQueueName()); + keys.add(ServerSettings.getCancelQueueName()); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys); + props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName()); + return props; + } + + @Override + public void onMessage(MessageContext message) { + TaskSubmitEvent event = new TaskSubmitEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = new byte[0]; + try { + bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(event.getExperimentId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + } + } + + private class TaskLaunchMessageHandler implements MessageHandler { + private String experimentNode; + private String nodeName; + + public TaskLaunchMessageHandler() { + experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0"); + } + + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + ArrayList<String> keys = new ArrayList<String>(); + keys.add(ServerSettings.getLaunchQueueName()); + keys.add(ServerSettings.getCancelQueueName()); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys); + props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName()); + return props; + } + + public void onMessage(MessageContext message) { + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); + if (message.getType().equals(MessageType.LAUNCHTASK)) { + try { + TaskSubmitEvent event = new TaskSubmitEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + // update experiment status to executing + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.EXECUTING); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + registry.update(RegistryModelType.EXPERIMENT_STATUS, status, event.getExperimentId()); + experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + try { + GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient, + experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); + AiravataZKUtils.getExpStatePath(event.getExperimentId()); + submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } + } catch (TException e) { + logger.error(e.getMessage(), e); //nobody is listening so nothing to throw + } catch (RegistryException e) { + logger.error("Error while updating experiment status", e); + } + } else if (message.getType().equals(MessageType.TERMINATETASK)) { + boolean cancelSuccess = false; + TaskTerminateEvent event = new TaskTerminateEvent(); + TBase messageEvent = message.getEvent(); + try { + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, message.getDeliveryTag()); + if (saveDeliveryTagSuccess) { + cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); + } else { + throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" + + "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled."); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + }finally { + if (cancelSuccess) { + // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message. + } else { + try { + if (GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) { + if (!rabbitMQTaskLaunchConsumer.isOpen()) { + rabbitMQTaskLaunchConsumer.reconnect(); + } + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } + } catch (Exception e) { + logger.error("Error while ack to cancel request, experimentId: " + event.getExperimentId()); + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/main/resources/gsissh.properties ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/resources/gsissh.properties b/modules/gfac/gfac-service/src/main/resources/gsissh.properties new file mode 100644 index 0000000..3fdf76d --- /dev/null +++ b/modules/gfac/gfac-service/src/main/resources/gsissh.properties @@ -0,0 +1,26 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +########################################################################### +# Specifies system level configurations as a key/value pairs. +########################################################################### + +StrictHostKeyChecking=no +ssh.session.timeout=360000 http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java new file mode 100644 index 0000000..21c137f --- /dev/null +++ b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/GfacClientFactoryTest.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ + +package org.apache.airavata.gfac.client; + +//import org.apache.airavata.client.AiravataAPIFactory; +//import org.apache.airavata.client.api.AiravataAPI; +//import org.apache.airavata.client.api.exception.AiravataAPIInvocationException; +//import org.apache.airavata.client.tools.DocumentCreator; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.client.util.Initialize; +import org.apache.airavata.gfac.cpi.GfacService; +import org.apache.airavata.gfac.server.GfacServer; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; + +public class GfacClientFactoryTest { + private final static Logger logger = LoggerFactory.getLogger(GfacClientFactoryTest.class); +// private DocumentCreator documentCreator; + private GfacService.Client gfacClient; + private Registry registry; + private int NUM_CONCURRENT_REQUESTS = 1; + Initialize initialize; + GfacServer service; + private static ServerCnxnFactory cnxnFactory; +/* + @Test + public void setUp() { + AiravataUtils.setExecutionAsServer(); + initialize = new Initialize("registry-derby.sql"); + initialize.initializeDB(); + AiravataZKUtils.startEmbeddedZK(cnxnFactory); + try { + service = (new GfacServer()); + service.start(); + registry = RegistryFactory.getDefaultRegistry(); + } catch (Exception e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + AiravataUtils.setExecutionAsServer(); + documentCreator = new DocumentCreator(getAiravataAPI()); + documentCreator.createLocalHostDocs(); + + try { + service.stop(); + cnxnFactory.shutdown(); + } catch (Exception e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } + + private AiravataAPI getAiravataAPI() { + AiravataAPI airavataAPI = null; + try { + String systemUserName = ServerSettings.getDefaultUser(); + String gateway = ServerSettings.getDefaultUserGateway(); + airavataAPI = AiravataAPIFactory.getAPI(gateway, systemUserName); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (AiravataAPIInvocationException e) { + e.printStackTrace(); + } + return airavataAPI; + } +*/ + + private void storeDescriptors() { + + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java new file mode 100644 index 0000000..651f414 --- /dev/null +++ b/modules/gfac/gfac-service/src/test/java/org/apache/airavata/gfac/client/util/Initialize.java @@ -0,0 +1,330 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.client.util; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.persistance.registry.jpa.ResourceType; +import org.apache.airavata.persistance.registry.jpa.resources.*; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.derby.drda.NetworkServerControl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.sql.*; +import java.util.StringTokenizer; + +public class Initialize { + private static final Logger logger = LoggerFactory.getLogger(Initialize.class); + public static final String DERBY_SERVER_MODE_SYS_PROPERTY = "derby.drda.startNetworkServer"; + public String scriptName = "registry-derby.sql"; + private NetworkServerControl server; + private static final String delimiter = ";"; + public static final String PERSISTANT_DATA = "Configuration"; + + public Initialize(String scriptName) { + this.scriptName = scriptName; + } + + public static boolean checkStringBufferEndsWith(StringBuffer buffer, String suffix) { + if (suffix.length() > buffer.length()) { + return false; + } + // this loop is done on purpose to avoid memory allocation performance + // problems on various JDKs + // StringBuffer.lastIndexOf() was introduced in jdk 1.4 and + // implementation is ok though does allocation/copying + // StringBuffer.toString().endsWith() does massive memory + // allocation/copying on JDK 1.5 + // See http://issues.apache.org/bugzilla/show_bug.cgi?id=37169 + int endIndex = suffix.length() - 1; + int bufferIndex = buffer.length() - 1; + while (endIndex >= 0) { + if (buffer.charAt(bufferIndex) != suffix.charAt(endIndex)) { + return false; + } + bufferIndex--; + endIndex--; + } + return true; + } + + private static boolean isServerStarted(NetworkServerControl server, int ntries) + { + for (int i = 1; i <= ntries; i ++) + { + try { + Thread.sleep(500); + server.ping(); + return true; + } + catch (Exception e) { + if (i == ntries) + return false; + } + } + return false; + } + + public void initializeDB() throws SQLException{ + String jdbcUrl = null; + String jdbcUser = null; + String jdbcPassword = null; + try{ + jdbcUrl = ServerSettings.getSetting("registry.jdbc.url"); + jdbcUser = ServerSettings.getSetting("registry.jdbc.user"); + jdbcPassword = ServerSettings.getSetting("registry.jdbc.password"); + jdbcUrl = jdbcUrl + "?" + "user=" + jdbcUser + "&" + "password=" + jdbcPassword; + } catch (ApplicationSettingsException e) { + logger.error("Unable to read properties", e); + } + startDerbyInServerMode(); + if(!isServerStarted(server, 20)){ + throw new RuntimeException("Derby server cound not started within five seconds..."); + } + + Connection conn = null; + try { + Class.forName(Utils.getJDBCDriver()).newInstance(); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + if (!isDatabaseStructureCreated(PERSISTANT_DATA, conn)) { + executeSQLScript(conn); + logger.info("New Database created for Registry"); + } else { + logger.debug("Database already created for Registry!"); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("Database failure", e); + } finally { + try { + if (conn != null){ + if (!conn.getAutoCommit()) { + conn.commit(); + } + conn.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + try{ + GatewayResource gatewayResource = new GatewayResource(); + gatewayResource.setGatewayId(ServerSettings.getSetting("default.registry.gateway")); + gatewayResource.setGatewayName(ServerSettings.getSetting("default.registry.gateway")); + gatewayResource.setDomain("test-domain"); + gatewayResource.setEmailAddress("test-email"); + gatewayResource.save(); + + UserResource userResource = new UserResource(); + userResource.setUserName(ServerSettings.getSetting("default.registry.user")); + userResource.setPassword(ServerSettings.getSetting("default.registry.password")); + userResource.save(); + + WorkerResource workerResource = (WorkerResource) gatewayResource.create(ResourceType.GATEWAY_WORKER); + workerResource.setUser(userResource.getUserName()); + workerResource.save(); + + ProjectResource projectResource = (ProjectResource)workerResource.create(ResourceType.PROJECT); + projectResource.setGatewayId(gatewayResource.getGatewayId()); + projectResource.setId("default"); + projectResource.setName("default"); + projectResource.setWorker(workerResource); + projectResource.save(); + + + } catch (ApplicationSettingsException e) { + logger.error("Unable to read properties", e); + throw new SQLException(e.getMessage(), e); + } catch (RegistryException e) { + logger.error("Unable to save data to registry", e); + throw new SQLException(e.getMessage(), e); + } + } + + public static boolean isDatabaseStructureCreated(String tableName, Connection conn) { + try { + System.out.println("Running a query to test the database tables existence."); + // check whether the tables are already created with a query + Statement statement = null; + try { + statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("select * from " + tableName); + if (rs != null) { + rs.close(); + } + } finally { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + return false; + } + } + } catch (SQLException e) { + return false; + } + + return true; + } + + private void executeSQLScript(Connection conn) throws Exception { + StringBuffer sql = new StringBuffer(); + BufferedReader reader = null; + try{ + + InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(scriptName); + reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("//")) { + continue; + } + if (line.startsWith("--")) { + continue; + } + StringTokenizer st = new StringTokenizer(line); + if (st.hasMoreTokens()) { + String token = st.nextToken(); + if ("REM".equalsIgnoreCase(token)) { + continue; + } + } + sql.append(" ").append(line); + + // SQL defines "--" as a comment to EOL + // and in Oracle it may contain a hint + // so we cannot just remove it, instead we must end it + if (line.indexOf("--") >= 0) { + sql.append("\n"); + } + if ((checkStringBufferEndsWith(sql, delimiter))) { + executeSQL(sql.substring(0, sql.length() - delimiter.length()), conn); + sql.replace(0, sql.length(), ""); + } + } + // Catch any statements not followed by ; + if (sql.length() > 0) { + executeSQL(sql.toString(), conn); + } + }catch (IOException e){ + logger.error("Error occurred while executing SQL script for creating Airavata database", e); + throw new Exception("Error occurred while executing SQL script for creating Airavata database", e); + }finally { + if (reader != null) { + reader.close(); + } + + } + + } + + private static void executeSQL(String sql, Connection conn) throws Exception { + // Check and ignore empty statements + if ("".equals(sql.trim())) { + return; + } + + Statement statement = null; + try { + logger.debug("SQL : " + sql); + + boolean ret; + int updateCount = 0, updateCountTotal = 0; + statement = conn.createStatement(); + ret = statement.execute(sql); + updateCount = statement.getUpdateCount(); + do { + if (!ret) { + if (updateCount != -1) { + updateCountTotal += updateCount; + } + } + ret = statement.getMoreResults(); + if (ret) { + updateCount = statement.getUpdateCount(); + } + } while (ret); + + logger.debug(sql + " : " + updateCountTotal + " rows affected"); + + SQLWarning warning = conn.getWarnings(); + while (warning != null) { + logger.warn(warning + " sql warning"); + warning = warning.getNextWarning(); + } + conn.clearWarnings(); + } catch (SQLException e) { + if (e.getSQLState().equals("X0Y32")) { + // eliminating the table already exception for the derby + // database + logger.info("Table Already Exists", e); + } else { + throw new Exception("Error occurred while executing : " + sql, e); + } + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + logger.error("Error occurred while closing result set.", e); + } + } + } + } + + private void startDerbyInServerMode() { + try { + System.setProperty(DERBY_SERVER_MODE_SYS_PROPERTY, "true"); + server = new NetworkServerControl(InetAddress.getByName(Utils.getHost()), + 20000, + Utils.getJDBCUser(), Utils.getJDBCPassword()); + java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true); + server.start(consoleWriter); + } catch (IOException e) { + logger.error("Unable to start Apache derby in the server mode! Check whether " + + "specified port is available"); + } catch (Exception e) { + logger.error("Unable to start Apache derby in the server mode! Check whether " + + "specified port is available"); + } + + } + + public void stopDerbyServer() throws SQLException{ + try { + server.shutdown(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new SQLException("Error while stopping derby server", e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/gsissh.properties ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/resources/gsissh.properties b/modules/gfac/gfac-service/src/test/resources/gsissh.properties new file mode 100644 index 0000000..3fdf76d --- /dev/null +++ b/modules/gfac/gfac-service/src/test/resources/gsissh.properties @@ -0,0 +1,26 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +########################################################################### +# Specifies system level configurations as a key/value pairs. +########################################################################### + +StrictHostKeyChecking=no +ssh.session.timeout=360000 http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/monitor.properties ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/resources/monitor.properties b/modules/gfac/gfac-service/src/test/resources/monitor.properties new file mode 100644 index 0000000..7f0299a --- /dev/null +++ b/modules/gfac/gfac-service/src/test/resources/monitor.properties @@ -0,0 +1,30 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +primaryMonitor=org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor +secondaryMonitor=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor +amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org +connection.name=xsede_private +trusted.certificate.location=/Users/chathuri/dev/airavata/cert/certificates +certificate.path=/Users/chathuri/dev/airavata/cert/certificates +myproxy.server=myproxy.teragrid.org +myproxy.user=ogce +myproxy.password= +myproxy.life=3600 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/orchestrator.properties ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/resources/orchestrator.properties b/modules/gfac/gfac-service/src/test/resources/orchestrator.properties new file mode 100644 index 0000000..35c0427 --- /dev/null +++ b/modules/gfac/gfac-service/src/test/resources/orchestrator.properties @@ -0,0 +1,26 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter +job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator +submitter.interval=10000 +threadpool.size=0 +start.submitter=true +embedded.mode=true +enable.validation=false http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/registry-derby.sql ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/resources/registry-derby.sql b/modules/gfac/gfac-service/src/test/resources/registry-derby.sql new file mode 100644 index 0000000..9ed5ca9 --- /dev/null +++ b/modules/gfac/gfac-service/src/test/resources/registry-derby.sql @@ -0,0 +1,361 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +CREATE TABLE GATEWAY +( + GATEWAY_NAME VARCHAR(255), + OWNER VARCHAR(255), + PRIMARY KEY (GATEWAY_NAME) +); + +CREATE TABLE CONFIGURATION +( + CONFIG_KEY VARCHAR(255), + CONFIG_VAL VARCHAR(255), + EXPIRE_DATE TIMESTAMP DEFAULT '0000-00-00 00:00:00', + CATEGORY_ID VARCHAR (255), + PRIMARY KEY(CONFIG_KEY, CONFIG_VAL, CATEGORY_ID) +); + +INSERT INTO CONFIGURATION (CONFIG_KEY, CONFIG_VAL, EXPIRE_DATE, CATEGORY_ID) VALUES('registry.version', '0.12', CURRENT_TIMESTAMP ,'SYSTEM'); + +CREATE TABLE USERS +( + USER_NAME VARCHAR(255), + PASSWORD VARCHAR(255), + PRIMARY KEY(USER_NAME) +); + +CREATE TABLE GATEWAY_WORKER +( + GATEWAY_NAME VARCHAR(255), + USER_NAME VARCHAR(255), + PRIMARY KEY (GATEWAY_NAME, USER_NAME), + FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE, + FOREIGN KEY (USER_NAME) REFERENCES USERS(USER_NAME) ON DELETE CASCADE +); + +CREATE TABLE PROJECT +( + GATEWAY_NAME VARCHAR(255), + USER_NAME VARCHAR(255) NOT NULL, + PROJECT_ID VARCHAR(255), + PROJECT_NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(255), + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (PROJECT_ID), + FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE, + FOREIGN KEY (USER_NAME) REFERENCES USERS(USER_NAME) ON DELETE CASCADE +); + +CREATE TABLE PROJECT_USER +( + PROJECT_ID VARCHAR(255), + USER_NAME VARCHAR(255), + PRIMARY KEY (PROJECT_ID,USER_NAME), + FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE, + FOREIGN KEY (USER_NAME) REFERENCES USERS(USER_NAME) ON DELETE CASCADE +); + +CREATE TABLE PUBLISHED_WORKFLOW +( + GATEWAY_NAME VARCHAR(255), + CREATED_USER VARCHAR(255), + PUBLISH_WORKFLOW_NAME VARCHAR(255), + VERSION VARCHAR(255), + PUBLISHED_DATE TIMESTAMP DEFAULT '0000-00-00 00:00:00', + PATH VARCHAR (255), + WORKFLOW_CONTENT BLOB, + PRIMARY KEY(GATEWAY_NAME, PUBLISH_WORKFLOW_NAME), + FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE, + FOREIGN KEY (CREATED_USER) REFERENCES USERS(USER_NAME) ON DELETE CASCADE +); + +CREATE TABLE USER_WORKFLOW +( + GATEWAY_NAME VARCHAR(255), + OWNER VARCHAR(255), + TEMPLATE_NAME VARCHAR(255), + LAST_UPDATED_TIME TIMESTAMP DEFAULT CURRENT TIMESTAMP, + PATH VARCHAR (255), + WORKFLOW_GRAPH BLOB, + PRIMARY KEY(GATEWAY_NAME, OWNER, TEMPLATE_NAME), + FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE, + FOREIGN KEY (OWNER) REFERENCES USERS(USER_NAME) ON DELETE CASCADE +); + +CREATE TABLE EXPERIMENT +( + EXPERIMENT_ID VARCHAR(255), + GATEWAY_NAME VARCHAR(255), + EXECUTION_USER VARCHAR(255) NOT NULL, + PROJECT_ID VARCHAR(255) NOT NULL, + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + EXPERIMENT_NAME VARCHAR(255) NOT NULL, + EXPERIMENT_DESCRIPTION VARCHAR(255), + APPLICATION_ID VARCHAR(255), + APPLICATION_VERSION VARCHAR(255), + WORKFLOW_TEMPLATE_ID VARCHAR(255), + WORKFLOW_TEMPLATE_VERSION VARCHAR(255), + WORKFLOW_EXECUTION_ID VARCHAR(255), + PRIMARY KEY(EXPERIMENT_ID), + FOREIGN KEY (GATEWAY_NAME) REFERENCES GATEWAY(GATEWAY_NAME) ON DELETE CASCADE, + FOREIGN KEY (EXECUTION_USER) REFERENCES USERS(USER_NAME) ON DELETE CASCADE, + FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE +); + +CREATE TABLE EXPERIMENT_INPUT +( + EXPERIMENT_ID VARCHAR(255), + INPUT_KEY VARCHAR(255) NOT NULL, + INPUT_TYPE VARCHAR(255), + METADATA VARCHAR(255), + VALUE CLOB, + PRIMARY KEY(EXPERIMENT_ID,INPUT_KEY), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE +); + +CREATE TABLE EXPERIMENT_OUTPUT +( + EXPERIMENT_ID VARCHAR(255), + OUTPUT_KEY VARCHAR(255) NOT NULL, + OUTPUT_KEY_TYPE VARCHAR(255), + METADATA VARCHAR(255), + VALUE CLOB, + PRIMARY KEY(EXPERIMENT_ID,OUTPUT_KEY), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE +); + + +CREATE TABLE WORKFLOW_NODE_DETAIL +( + EXPERIMENT_ID VARCHAR(255) NOT NULL, + NODE_INSTANCE_ID VARCHAR(255), + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + NODE_NAME VARCHAR(255) NOT NULL, + EXECUTION_UNIT VARCHAR(255) NOT NULL, + EXECUTION_UNIT_DATA VARCHAR(255), + PRIMARY KEY(NODE_INSTANCE_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE +); + +CREATE TABLE TASK_DETAIL +( + TASK_ID VARCHAR(255), + NODE_INSTANCE_ID VARCHAR(255), + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + APPLICATION_ID VARCHAR(255), + APPLICATION_VERSION VARCHAR(255), + APPLICATION_DEPLOYMENT_ID VARCHAR(255), + PRIMARY KEY(TASK_ID), + FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE +); + +CREATE TABLE ERROR_DETAIL +( + ERROR_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY, + EXPERIMENT_ID VARCHAR(255), + TASK_ID VARCHAR(255), + NODE_INSTANCE_ID VARCHAR(255), + JOB_ID VARCHAR(255), + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + ACTUAL_ERROR_MESSAGE CLOB, + USER_FRIEDNLY_ERROR_MSG VARCHAR(255), + TRANSIENT_OR_PERSISTENT SMALLINT, + ERROR_CATEGORY VARCHAR(255), + CORRECTIVE_ACTION VARCHAR(255), + ACTIONABLE_GROUP VARCHAR(255), + PRIMARY KEY(ERROR_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE, + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE, + FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE +); + +CREATE TABLE APPLICATION_INPUT +( + TASK_ID VARCHAR(255), + INPUT_KEY VARCHAR(255) NOT NULL, + INPUT_KEY_TYPE VARCHAR(255), + METADATA VARCHAR(255), + VALUE CLOB, + PRIMARY KEY(TASK_ID,INPUT_KEY), + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE APPLICATION_OUTPUT +( + TASK_ID VARCHAR(255), + OUTPUT_KEY VARCHAR(255) NOT NULL, + OUTPUT_KEY_TYPE VARCHAR(255), + METADATA VARCHAR(255), + VALUE CLOB, + PRIMARY KEY(TASK_ID,OUTPUT_KEY), + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE NODE_INPUT +( + NODE_INSTANCE_ID VARCHAR(255), + INPUT_KEY VARCHAR(255) NOT NULL, + INPUT_KEY_TYPE VARCHAR(255), + METADATA VARCHAR(255), + VALUE VARCHAR(255), + PRIMARY KEY(NODE_INSTANCE_ID,INPUT_KEY), + FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE +); + +CREATE TABLE NODE_OUTPUT +( + NODE_INSTANCE_ID VARCHAR(255), + OUTPUT_KEY VARCHAR(255) NOT NULL, + OUTPUT_KEY_TYPE VARCHAR(255), + METADATA VARCHAR(255), + VALUE VARCHAR(255), + PRIMARY KEY(NODE_INSTANCE_ID,OUTPUT_KEY), + FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE +); + +CREATE TABLE JOB_DETAIL +( + JOB_ID VARCHAR(255), + TASK_ID VARCHAR(255), + JOB_DESCRIPTION CLOB NOT NULL, + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + COMPUTE_RESOURCE_CONSUMED VARCHAR(255), + PRIMARY KEY (TASK_ID, JOB_ID), + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE DATA_TRANSFER_DETAIL +( + TRANSFER_ID VARCHAR(255), + TASK_ID VARCHAR(255), + CREATION_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + TRANSFER_DESC CLOB NOT NULL, + PRIMARY KEY(TRANSFER_ID), + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE STATUS +( + STATUS_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY, + EXPERIMENT_ID VARCHAR(255), + NODE_INSTANCE_ID VARCHAR(255), + TRANSFER_ID VARCHAR(255), + TASK_ID VARCHAR(255), + JOB_ID VARCHAR(255), + STATE VARCHAR(255), + STATUS_UPDATE_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00', + STATUS_TYPE VARCHAR(255), + PRIMARY KEY(STATUS_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE, + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE, + FOREIGN KEY (NODE_INSTANCE_ID) REFERENCES WORKFLOW_NODE_DETAIL(NODE_INSTANCE_ID) ON DELETE CASCADE, + FOREIGN KEY (TRANSFER_ID) REFERENCES DATA_TRANSFER_DETAIL(TRANSFER_ID) ON DELETE CASCADE +); + +CREATE TABLE CONFIG_DATA +( + EXPERIMENT_ID VARCHAR(255), + AIRAVATA_AUTO_SCHEDULE SMALLINT NOT NULL, + OVERRIDE_MANUAL_SCHEDULE_PARAMS SMALLINT NOT NULL, + SHARE_EXPERIMENT SMALLINT, + PRIMARY KEY(EXPERIMENT_ID) +); + +CREATE TABLE COMPUTATIONAL_RESOURCE_SCHEDULING +( + RESOURCE_SCHEDULING_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY, + EXPERIMENT_ID VARCHAR(255), + TASK_ID VARCHAR(255), + RESOURCE_HOST_ID VARCHAR(255), + CPU_COUNT INTEGER, + NODE_COUNT INTEGER, + NO_OF_THREADS INTEGER, + QUEUE_NAME VARCHAR(255), + WALLTIME_LIMIT INTEGER, + JOB_START_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00', + TOTAL_PHYSICAL_MEMORY INTEGER, + COMPUTATIONAL_PROJECT_ACCOUNT VARCHAR(255), + PRIMARY KEY(RESOURCE_SCHEDULING_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE, + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE ADVANCE_INPUT_DATA_HANDLING +( + INPUT_DATA_HANDLING_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY, + EXPERIMENT_ID VARCHAR(255), + TASK_ID VARCHAR(255), + WORKING_DIR_PARENT VARCHAR(255), + UNIQUE_WORKING_DIR VARCHAR(255), + STAGE_INPUT_FILES_TO_WORKING_DIR SMALLINT, + CLEAN_AFTER_JOB SMALLINT, + PRIMARY KEY(INPUT_DATA_HANDLING_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE, + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE ADVANCE_OUTPUT_DATA_HANDLING +( + OUTPUT_DATA_HANDLING_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY, + EXPERIMENT_ID VARCHAR(255), + TASK_ID VARCHAR(255), + OUTPUT_DATA_DIR VARCHAR(255), + DATA_REG_URL VARCHAR (255), + PERSIST_OUTPUT_DATA SMALLINT, + PRIMARY KEY(OUTPUT_DATA_HANDLING_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE, + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE QOS_PARAM +( + QOS_ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY, + EXPERIMENT_ID VARCHAR(255), + TASK_ID VARCHAR(255), + START_EXECUTION_AT VARCHAR(255), + EXECUTE_BEFORE VARCHAR(255), + NO_OF_RETRIES INTEGER, + PRIMARY KEY(QOS_ID), + FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE, + FOREIGN KEY (TASK_ID) REFERENCES TASK_DETAIL(TASK_ID) ON DELETE CASCADE +); + +CREATE TABLE COMMUNITY_USER +( + GATEWAY_NAME VARCHAR(256) NOT NULL, + COMMUNITY_USER_NAME VARCHAR(256) NOT NULL, + TOKEN_ID VARCHAR(256) NOT NULL, + COMMUNITY_USER_EMAIL VARCHAR(256) NOT NULL, + PRIMARY KEY (GATEWAY_NAME, COMMUNITY_USER_NAME, TOKEN_ID) +); + +CREATE TABLE CREDENTIALS +( + GATEWAY_ID VARCHAR(256) NOT NULL, + TOKEN_ID VARCHAR(256) NOT NULL, + CREDENTIAL BLOB NOT NULL, + PORTAL_USER_ID VARCHAR(256) NOT NULL, + TIME_PERSISTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (GATEWAY_ID, TOKEN_ID) +); + + http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-service/src/test/resources/zoo.cfg ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/test/resources/zoo.cfg b/modules/gfac/gfac-service/src/test/resources/zoo.cfg new file mode 100644 index 0000000..add0758 --- /dev/null +++ b/modules/gfac/gfac-service/src/test/resources/zoo.cfg @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +tickTime=2000 +initLimit=10 +syncLimit=5 +dataDir=data +clientPort=2181 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/pom.xml b/modules/gfac/gfac-ssh/pom.xml deleted file mode 100644 index 316568e..0000000 --- a/modules/gfac/gfac-ssh/pom.xml +++ /dev/null @@ -1,114 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file - distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under - the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may - obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to - in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF - ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under - the License. --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.airavata</groupId> - <artifactId>gfac</artifactId> - <version>0.16-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>airavata-gfac-ssh</artifactId> - <name>Airavata GFac SSH implementation</name> - <description>This is the extension of</description> - <url>http://airavata.apache.org/</url> - - <dependencies> - - <!--email monitoring--> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-gfac-email-monitor</artifactId> - <version>${project.version}</version> - </dependency> - <!-- Logging --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <!-- GFAC schemas --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-gfac-core</artifactId> - <version>${project.version}</version> - </dependency> - <!-- Credential Store --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-credential-store</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-server-configuration</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-client-configuration</artifactId> - <scope>test</scope> - </dependency> - - - <!-- Test --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.testng</groupId> - <artifactId>testng</artifactId> - <version>6.1.1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> - </dependency> - - <!-- gsi-ssh api dependencies --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>gsissh</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-data-models</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.jcraft</groupId> - <artifactId>jsch</artifactId> - <version>0.1.50</version> - </dependency> - <dependency> - <groupId>org.apache.xmlbeans</groupId> - <artifactId>xmlbeans</artifactId> - <version>${xmlbeans.version}</version> - </dependency> - <dependency> - <groupId>net.schmizz</groupId> - <artifactId>sshj</artifactId> - <version>0.6.1</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java deleted file mode 100644 index 1c31138..0000000 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.ssh.context; - -import org.apache.airavata.gsi.ssh.api.ServerInfo; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; - -public class SSHAuthWrapper { - private ServerInfo serverInfo; - - private AuthenticationInfo authenticationInfo; - - private String key; - - public SSHAuthWrapper(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String key) { - this.serverInfo = serverInfo; - this.authenticationInfo = authenticationInfo; - this.key = key; - } - - public ServerInfo getServerInfo() { - return serverInfo; - } - - public AuthenticationInfo getAuthenticationInfo() { - return authenticationInfo; - } - - public String getKey() { - return key; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java deleted file mode 100644 index 46f1dc3..0000000 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.ssh.handler; - -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.context.MessageContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; -import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.gsi.ssh.api.SSHApiException; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.workspace.experiment.*; -import org.apache.airavata.registry.cpi.ChildDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.*; - -/** - * This handler will copy input data from gateway machine to airavata - * installed machine, later running handlers can copy the input files to computing resource - * <Handler class="AdvancedSCPOutputHandler"> - * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> - * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> - * <property name="userName" value="airavata"/> - * <property name="hostName" value="gw98.iu.xsede.org"/> - * <property name="inputPath" value="/home/airavata/outputData"/> - */ -public class AdvancedSCPInputHandler extends AbstractHandler { - private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class); - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; - public static final int DEFAULT_SSH_PORT = 22; - - private String password = null; - - private String publicKeyPath; - - private String passPhrase; - - private String privateKeyPath; - - private String userName; - - private String hostName; - - private String inputPath; - - public void initProperties(Properties properties) throws GFacHandlerException { - password = (String) properties.get("password"); - passPhrase = (String) properties.get("passPhrase"); - privateKeyPath = (String) properties.get("privateKeyPath"); - publicKeyPath = (String) properties.get("publicKeyPath"); - userName = (String) properties.get("userName"); - hostName = (String) properties.get("hostName"); - inputPath = (String) properties.get("inputPath"); - } - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - int index = 0; - int oldIndex = 0; - List<String> oldFiles = new ArrayList<String>(); - MessageContext inputNew = new MessageContext(); - StringBuffer data = new StringBuffer("|"); - Cluster pbsCluster = null; - - try { - String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); - if (pluginData != null) { - try { - oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); - oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); - if (oldIndex == oldFiles.size()) { - log.info("Old data looks good !!!!"); - } else { - oldIndex = 0; - oldFiles.clear(); - } - } catch (NumberFormatException e) { - log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); - } - } - - AuthenticationInfo authenticationInfo = null; - if (password != null) { - authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); - } else { - authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, - this.passPhrase); - } - - // Server info - String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); - if (index < oldIndex) { - parentPath = oldFiles.get(index); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - } else { - (new File(parentPath)).mkdirs(); - StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - DataTransferDetails detail = new DataTransferDetails(); - TransferStatus status = new TransferStatus(); - // here doesn't matter what the job manager is because we are only doing some file handling - // not really dealing with monitoring or job submission, so we pa - - MessageContext input = jobExecutionContext.getInMessageContext(); - Set<String> parameters = input.getParameters().keySet(); - for (String paramName : parameters) { - InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); - String paramValue = inputParamType.getValue(); - // TODO: Review this with type - if (inputParamType.getType() == DataType.URI) { - try { - URL file = new URL(paramValue); - String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT; - GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT); - pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster(); - paramValue = file.getPath(); - } catch (MalformedURLException e) { - String key = this.userName + this.hostName + DEFAULT_SSH_PORT; - GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT); - pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster(); - log.error(e.getLocalizedMessage(), e); - } - - if (index < oldIndex) { - log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - inputParamType.setValue(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - } else { - String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath); - inputParamType.setValue(stageInputFile); - StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); - status.setTransferState(TransferState.UPLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFile); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - } - // FIXME: what is the thrift model DataType equivalent for URIArray type? -// else if ("URIArray".equals(actualParameter.getType().getType().toString())) { -// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); -// List<String> newFiles = new ArrayList<String>(); -// for (String paramValueEach : split) { -// try { -// URL file = new URL(paramValue); -// this.userName = file.getUserInfo(); -// this.hostName = file.getHost(); -// paramValueEach = file.getPath(); -// } catch (MalformedURLException e) { -// log.error(e.getLocalizedMessage(), e); -// } -// if (index < oldIndex) { -// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); -// newFiles.add(oldFiles.get(index)); -// data.append(oldFiles.get(index++)).append(","); -// } else { -// String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath); -// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); -// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); -// newFiles.add(stageInputFiles); -// } -// } -// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); -// } - inputNew.getParameters().put(paramName, inputParamType); - } - } catch (Exception e) { - log.error(e.getMessage()); - try { - StringWriter errors = new StringWriter(); - e.printStackTrace(new PrintWriter(errors)); - GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); - } - jobExecutionContext.setInMessageContext(inputNew); - } - - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - this.invoke(jobExecutionContext); - } - - private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException { - try { - cluster.scpFrom(paramValue, parentPath); - return "file://" + parentPath + File.separator + (new File(paramValue)).getName(); - } catch (SSHApiException e) { - log.error("Error tranfering remote file to local file, remote path: " + paramValue); - throw new GFacException(e); - } - } -}
