http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/DataManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/DataManagerImpl.java b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/DataManagerImpl.java new file mode 100644 index 0000000..0cef6e0 --- /dev/null +++ b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/DataManagerImpl.java @@ -0,0 +1,444 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.data.manager.core; + +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.client.CredentialStoreClientFactory; +import org.apache.airavata.credential.store.cpi.CredentialStoreService; +import org.apache.airavata.credential.store.datamodel.SSHCredential; +import org.apache.airavata.credential.store.exception.CredentialStoreException; +import org.apache.airavata.data.manager.core.ssh.SSHUtils; +import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; +import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.model.data.movement.DataMovementInterface; +import org.apache.airavata.model.data.movement.DataMovementProtocol; +import org.apache.airavata.model.data.movement.SCPDataMovement; +import org.apache.airavata.model.data.resource.DataReplicaLocationModel; +import org.apache.airavata.model.data.resource.DataResourceModel; +import org.apache.airavata.model.data.resource.DataResourceType; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.DataCatalog; +import org.apache.airavata.registry.cpi.DataCatalogException; + +import org.apache.airavata.data.manager.cpi.DataManager; +import org.apache.airavata.data.manager.cpi.DataManagerException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; + +public class DataManagerImpl implements DataManager { + private final static Logger logger = LoggerFactory.getLogger(DataManagerImpl.class); + + private final AppCatalog appCatalog; + private final DataCatalog dataCatalog; + + public DataManagerImpl() throws DataManagerException { + try { + this.appCatalog = RegistryFactory.getAppCatalog(); + this.dataCatalog = RegistryFactory.getDataCatalog(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + public DataManagerImpl(AppCatalog appCatalog, DataCatalog dataCatalog){ + this.appCatalog = appCatalog; + this.dataCatalog = dataCatalog; + } + + /** + * To create a replica entry for an already existing file(s). This is how the system comes to know about already + * existing resources + * @param dataResourceModel + * @return + */ + @Override + public String registerResource(DataResourceModel dataResourceModel) throws DataManagerException { + try { + String resourceId = dataCatalog.registerResource(dataResourceModel); + return resourceId; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To remove a resource entry from the replica catalog + * @param resourceId + * @return + */ + @Override + public boolean removeResource(String resourceId) throws DataManagerException { + try { + boolean result = dataCatalog.removeResource(resourceId); + return result; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To update an existing data resource model + * @param dataResourceModel + * @return + * @throws DataCatalogException + */ + @Override + public boolean updateResource(DataResourceModel dataResourceModel) throws DataManagerException { + try { + boolean result = dataCatalog.updateResource(dataResourceModel); + return result; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To retrieve a resource object providing the resourceId + * @param resourceId + * @return + */ + @Override + public DataResourceModel getResource(String resourceId) throws DataManagerException { + try { + DataResourceModel dataResource = dataCatalog.getResource(resourceId); + return dataResource; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To create a new data replica location. This is how the system comes to know about already + * existing resources + * + * @param dataReplicaLocationModel + * @return + */ + @Override + public String registerReplicaLocation(DataReplicaLocationModel dataReplicaLocationModel) throws DataManagerException { + try { + String replicaId = dataCatalog.registerReplicaLocation(dataReplicaLocationModel); + return replicaId; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To remove a replica entry from the replica catalog + * + * @param replicaId + * @return + */ + @Override + public boolean removeReplicaLocation(String replicaId) throws DataManagerException { + try { + boolean result = dataCatalog.removeReplicaLocation(replicaId); + return result; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To update an existing data replica model + * + * @param dataReplicaLocationModel + * @return + * @throws DataCatalogException + */ + @Override + public boolean updateReplicaLocation(DataReplicaLocationModel dataReplicaLocationModel) throws DataManagerException { + try { + boolean result = dataCatalog.updateReplicaLocation(dataReplicaLocationModel); + return result; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To retrieve a replica object providing the replicaId + * + * @param replicaId + * @return + */ + @Override + public DataReplicaLocationModel getReplicaLocation(String replicaId) throws DataManagerException { + try { + DataReplicaLocationModel dataReplicaLocationModel = dataCatalog.getReplicaLocation(replicaId); + return dataReplicaLocationModel; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * To retrieve all the replica entries for a given resource id + * + * @param resourceId + * @return + * @throws DataCatalogException + */ + @Override + public List<DataReplicaLocationModel> getAllReplicaLocations(String resourceId) throws DataManagerException { + try { + List<DataReplicaLocationModel> dataReplicaLocationModelList = dataCatalog.getAllReplicaLocations(resourceId); + return dataReplicaLocationModelList; + } catch (DataCatalogException e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * API method to copy a resource to the provided destination storage resource. Only resources of type FILE can be + * copied using this API method. + * + * @param dataResourceId + * @param destStorageResourceId + * @param destinationParentPath + * @return + */ + @Override + public String copyResource(String dataResourceId, String destStorageResourceId, String destinationParentPath) throws DataManagerException { + try { + return copyReplica(dataResourceId, null, destStorageResourceId, destinationParentPath); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * API method to copy the specified replica to the provided destination storage resource. Only resources of type FILE + * can be copied using this API method. Method returns the new replicaId + * + * @param dataResourceId + * @param replicaId + * @param destStorageResourceId + * @param destinationParentPath + * @return + * @throws DataManagerException + */ + @Override + public String copyReplica(String dataResourceId, String replicaId, String destStorageResourceId, String destinationParentPath) throws DataManagerException { + try{ + DataResourceModel dataResourceModel = dataCatalog.getResource(dataResourceId); + if(dataResourceModel.getDataResourceType() != DataResourceType.FILE) + throw new DataCatalogException("Only resources of type FILE can be transferred using this method"); + + StorageResourceDescription destinationStorageResource = appCatalog.getStorageResource() + .getStorageResource(destStorageResourceId); + if(destinationStorageResource == null) + throw new DataCatalogException("Invalid destination storage resource id"); + + List<DataReplicaLocationModel> replicaLocationModels = dataResourceModel.getReplicaLocations(); + if(replicaLocationModels == null || replicaLocationModels.size() == 0) + throw new DataCatalogException("No replicas available for the given data resource"); + + DataReplicaLocationModel sourceReplica = null; + if(replicaId == null || replicaId.isEmpty()) { + //FIXME This should be an intelligent selection + sourceReplica = replicaLocationModels.get(0); + }else{ + for(DataReplicaLocationModel rp : replicaLocationModels){ + if(rp.getReplicaId().equals(replicaId)){ + sourceReplica = rp; + } + } + } + if(sourceReplica == null) + throw new DataManagerException("No matching source replica found"); + + StorageResourceDescription sourceStorageResource = appCatalog.getStorageResource() + .getStorageResource(sourceReplica.getStorageResourceId()); + if(sourceStorageResource == null) + throw new DataCatalogException("Cannot find storage resource of the source replica"); + + //FIXME Currently we support only SCP data movement protocol + List<DataMovementInterface> sourceDataMovementInterfaces = sourceStorageResource.getDataMovementInterfaces(); + Optional<DataMovementInterface> sourceDataMovementInterface = sourceDataMovementInterfaces.stream() + .filter(dmi -> dmi.getDataMovementProtocol() == DataMovementProtocol.SCP).findFirst(); + if(!sourceDataMovementInterface.isPresent()) + throw new DataCatalogException("No matching DMI found for source storage resource"); + List<DataMovementInterface> destDataMovementInterfaces = destinationStorageResource.getDataMovementInterfaces(); + Optional<DataMovementInterface> destDataMovementInterface = destDataMovementInterfaces.stream() + .filter(dmi -> dmi.getDataMovementProtocol() == DataMovementProtocol.SCP).findFirst(); + if(!destDataMovementInterface.isPresent()) + throw new DataCatalogException("No matching DMI found for destination storage resource"); + + //Finding the gateway specific storage preferences for resources + GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(dataResourceModel.getGatewayId()); + List<StoragePreference> storagePreferences = gatewayProfile.getStoragePreferences(); + StoragePreference sourceResourcePreference = null; + for(StoragePreference sp : storagePreferences) { + if (sp.getStorageResourceId().equals(sourceStorageResource.getStorageResourceId())) { + sourceResourcePreference = sp; + break; + } + } + if(sourceResourcePreference == null) + throw new DataCatalogException("Could not find storage preference for storage resource id:" + + sourceStorageResource.getStorageResourceId()); + StoragePreference destResourcePreference = null; + for(StoragePreference sp : storagePreferences) { + if (sp.getStorageResourceId().equals(destStorageResourceId)) { + destResourcePreference = sp; + break; + } + } + if(destResourcePreference == null) + throw new DataCatalogException("Could not find storage preference for storage resource id:" + + destinationStorageResource.getStorageResourceId()); + + String destFilePath = copyUsingScp(gatewayProfile, sourceStorageResource, sourceDataMovementInterface.get(), + sourceResourcePreference, sourceReplica, destinationStorageResource, destDataMovementInterface.get(), + destResourcePreference, destinationParentPath); + + DataReplicaLocationModel dataReplicaLocationModel = new DataReplicaLocationModel(); + dataReplicaLocationModel.setResourceId(dataResourceId); + dataReplicaLocationModel.setFileAbsolutePath(destFilePath); + String newReplicaId = this.registerReplicaLocation(dataReplicaLocationModel); + return newReplicaId; + }catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DataManagerException(e); + } + } + + /** + * This method copies the provided source replica to the destination storage resource and returns the absolute file path + * of the destination file. This method uses the credential store service to fetch required credentials for talking to + * storage resources + * + * @param gatewayProfile + * @param sourceStorageResource + * @param sourceDataMovementInterface + * @param sourceResourcePreference + * @param sourceReplica + * @param destStorageResource + * @param destDataMovementInterface + * @param destResourcePreference + * @param destinationParentPath + * @return + * @throws TException + * @throws ApplicationSettingsException + * @throws AppCatalogException + * @throws JSchException + * @throws IOException + */ + private String copyUsingScp(GatewayResourceProfile gatewayProfile, StorageResourceDescription sourceStorageResource, + DataMovementInterface sourceDataMovementInterface, StoragePreference sourceResourcePreference, + DataReplicaLocationModel sourceReplica, StorageResourceDescription destStorageResource, + DataMovementInterface destDataMovementInterface, StoragePreference destResourcePreference, + String destinationParentPath) + throws Exception { + //Creating JSch sessions + //Source session + Properties config = new java.util.Properties(); + config.put("StrictHostKeyChecking", "no"); + + CredentialStoreService.Client credentialStoreServiceClient = getCredentialStoreServiceClient(); + String sourceHostName = sourceStorageResource.getHostName(); + SCPDataMovement sourceSCPDMI = appCatalog.getComputeResource().getSCPDataMovement(sourceDataMovementInterface.getDataMovementInterfaceId()); + int sourcePort = sourceSCPDMI.getSshPort(); + String sourceLoginUserName = sourceResourcePreference.getLoginUserName(); + JSch sourceJSch = new JSch(); + String sourceCredentialStoreToken; + if(sourceResourcePreference.getResourceSpecificCredentialStoreToken() != null + && !sourceResourcePreference.getResourceSpecificCredentialStoreToken().isEmpty()){ + sourceCredentialStoreToken = sourceResourcePreference.getResourceSpecificCredentialStoreToken(); + }else{ + sourceCredentialStoreToken = gatewayProfile.getCredentialStoreToken(); + } + SSHCredential sourceSshCredential = credentialStoreServiceClient.getSSHCredential(sourceCredentialStoreToken, + gatewayProfile.getGatewayID()); + sourceJSch.addIdentity(UUID.randomUUID().toString(), sourceSshCredential.getPrivateKey().getBytes(), + sourceSshCredential.getPublicKey().getBytes(), sourceSshCredential.getPassphrase().getBytes()); + Session sourceSession = sourceJSch.getSession(sourceLoginUserName, sourceHostName, sourcePort); + sourceSession.setConfig(config); + sourceSession.connect(); + String sourceFilePath = sourceReplica.getFileAbsolutePath(); + + //Destination session + String destHostName = destStorageResource.getHostName(); + SCPDataMovement destSCPDMI = appCatalog.getComputeResource().getSCPDataMovement(destDataMovementInterface + .getDataMovementInterfaceId()); + int destPort = destSCPDMI.getSshPort(); + String destLoginUserName = sourceResourcePreference.getLoginUserName(); + JSch destJSch = new JSch(); + String destCredentialStoreToken; + if(destResourcePreference.getResourceSpecificCredentialStoreToken() != null + && !destResourcePreference.getResourceSpecificCredentialStoreToken().isEmpty()){ + destCredentialStoreToken = destResourcePreference.getResourceSpecificCredentialStoreToken(); + }else{ + destCredentialStoreToken = gatewayProfile.getCredentialStoreToken(); + } + SSHCredential destSshCredential = credentialStoreServiceClient.getSSHCredential(destCredentialStoreToken, + gatewayProfile.getGatewayID()); + destJSch.addIdentity(UUID.randomUUID().toString(), destSshCredential.getPrivateKey().getBytes(), + destSshCredential.getPublicKey().getBytes(), destSshCredential.getPassphrase().getBytes()); + Session destSession = destJSch.getSession(destLoginUserName, destHostName, destPort); + destSession.setConfig(config); + destSession.connect(); + + SSHUtils.scpThirdParty(sourceFilePath, sourceSession, destinationParentPath, destSession); + if(!destinationParentPath.endsWith(File.separator)) + destinationParentPath += File.separator; + String destFilePath = destinationParentPath + (new File(sourceFilePath).getName()); + return destFilePath; + } + + private CredentialStoreService.Client getCredentialStoreServiceClient() throws TException, ApplicationSettingsException { + final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort()); + final String serverHost = ServerSettings.getCredentialStoreServerHost(); + try { + return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort); + } catch (CredentialStoreException e) { + throw new TException("Unable to create credential store client...", e); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/CommandOutput.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/CommandOutput.java b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/CommandOutput.java new file mode 100644 index 0000000..0d8247a --- /dev/null +++ b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/CommandOutput.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.data.manager.core.ssh; + +import com.jcraft.jsch.Channel; +import java.io.OutputStream; + +public interface CommandOutput { + void onOutput(Channel var1); + + OutputStream getStandardError(); + + void exitCode(int var1); + + int getExitCode(); +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHApiException.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHApiException.java b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHApiException.java new file mode 100644 index 0000000..66678e5 --- /dev/null +++ b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHApiException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.data.manager.core.ssh; + +public class SSHApiException extends Exception { + + public SSHApiException(String message) { + super(message); + } + + public SSHApiException(String message, Exception e) { + super(message, e); + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHUtils.java b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHUtils.java new file mode 100644 index 0000000..582cade --- /dev/null +++ b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/SSHUtils.java @@ -0,0 +1,506 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.data.manager.core.ssh; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Arrays; +import java.util.List; + +/** + * Utility class to do all ssh and scp related things. + */ +public class SSHUtils { + private static final Logger log = LoggerFactory.getLogger(SSHUtils.class); + + + /** + * This will copy a local file to a remote location + * + * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass + * a dirctory we do copy it to that directory but we simply return the directory name + * todo handle the directory name as input and return the proper final output file name + * @param localFile Local file to transfer, this can be a directory + * @return returns the final remote file path, so that users can use the new file location + */ + public static String scpTo(String localFile, String remoteFile, Session session) throws IOException, + JSchException, SSHApiException { + FileInputStream fis = null; + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + boolean ptimestamp = true; + + // exec 'scp -t rfile' remotely + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile; + Channel channel = session.openChannel("exec"); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + ((ChannelExec) channel).setCommand(command); + + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + File _lfile = new File(localFile); + + if (ptimestamp) { + command = "T" + (_lfile.lastModified() / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = _lfile.length(); + command = "C0644 " + filesize + " "; + if (localFile.lastIndexOf('/') > 0) { + command += localFile.substring(localFile.lastIndexOf('/') + 1); + } else { + command += localFile; + } + command += "\n"; + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + // send a content of localFile + fis = new FileInputStream(localFile); + byte[] buf = new byte[1024]; + while (true) { + int len = fis.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); //out.flush(); + } + fis.close(); + fis = null; + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + out.close(); + stdOutReader.onOutput(channel); + + + channel.disconnect(); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + //since remote file is always a file we just return the file + return remoteFile; + } + + /** + * This method will copy a remote file to a local directory + * + * @param remoteFile remote file path, this has to be a full qualified path + * @param localFile This is the local file to copy, this can be a directory too + * @return returns the final local file path of the new file came from the remote resource + */ + public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException, + JSchException, SSHApiException { + FileOutputStream fos = null; + try { + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + + // exec 'scp -f remotefile' remotely + String command = "scp -f " + remoteFile; + Channel channel = session.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + if (!channel.isClosed()){ + channel.connect(); + } + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + + String file = null; + for (int i = 0; ; i++) { + in.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + file = new String(buf, 0, i); + break; + } + } + + //System.out.println("filesize="+filesize+", file="+file); + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + // read a content of lfile + fos = new FileOutputStream(prefix == null ? localFile : prefix + file); + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; + } + fos.write(buf, 0, foo); + filesize -= foo; + if (filesize == 0L) break; + } + fos.close(); + fos = null; + + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new SSHApiException(error); + } + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + try { + if (fos != null) fos.close(); + } catch (Exception ee) { + } + } + } + + /** + * This method will copy a remote file to a local directory + * + * @param sourceFile remote file path, this has to be a full qualified path + * @param sourceSession JSch session for source + * @param destinationFile This is the local file to copy, this can be a directory too + * @param destinationSession JSch Session for target + * @return returns the final local file path of the new file came from the remote resource + */ + public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile, Session destinationSession) throws + Exception{ + OutputStream sout = null; + InputStream sin = null; + OutputStream dout = null; + InputStream din = null; + try { + String prefix = null; + + // exec 'scp -f sourceFile' + String sourceCommand = "scp -f " + sourceFile; + Channel sourceChannel = sourceSession.openChannel("exec"); + ((ChannelExec) sourceChannel).setCommand(sourceCommand); + StandardOutReader sourceStdOutReader = new StandardOutReader(); + ((ChannelExec) sourceChannel).setErrStream(sourceStdOutReader.getStandardError()); + // get I/O streams for remote scp + sout = sourceChannel.getOutputStream(); + sin = sourceChannel.getInputStream(); + sourceChannel.connect(); + + + boolean ptimestamp = true; + // exec 'scp -t destinationFile' + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + destinationFile; + Channel targetChannel = destinationSession.openChannel("exec"); + StandardOutReader targetStdOutReader = new StandardOutReader(); + ((ChannelExec) targetChannel).setErrStream(targetStdOutReader.getStandardError()); + ((ChannelExec) targetChannel).setCommand(command); + // get I/O streams for remote scp + dout = targetChannel.getOutputStream(); + din = targetChannel.getInputStream(); + targetChannel.connect(); + + if (checkAck(din) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new Exception(error); + } + + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + sout.write(buf, 0, 1); + sout.flush(); + + while (true) { + int c = checkAck(sin); + if (c != 'C') { + break; + } + + // read '0644 ' + sin.read(buf, 0, 5); + + long fileSize = 0L; + while (true) { + if (sin.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + fileSize = fileSize * 10L + (long) (buf[0] - '0'); + } + + String fileName = null; + for (int i = 0; ; i++) { + sin.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + fileName = new String(buf, 0, i); + break; + } + } + String initData = "C0644 " + fileSize + " " + fileName + "\n"; + assert dout != null; + dout.write(initData.getBytes()); + dout.flush(); + + // send '\0' to source + buf[0] = 0; + sout.write(buf, 0, 1); + sout.flush(); + + int rLength; + while (true) { + if (buf.length < fileSize) rLength = buf.length; + else rLength = (int) fileSize; + rLength = sin.read(buf, 0, rLength); // read content of the source File + if (rLength < 0) { + // error + break; + } + dout.write(buf, 0, rLength); // write to destination file + fileSize -= rLength; + if (fileSize == 0L) break; + } + + // send '\0' to target + buf[0] = 0; + dout.write(buf, 0, 1); + dout.flush(); + if (checkAck(din) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new Exception(error); + } + dout.close(); + dout = null; + + if (checkAck(sin) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new Exception(error); + } + + // send '\0' + buf[0] = 0; + sout.write(buf, 0, 1); + sout.flush(); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + throw e; + } finally { + try { + if (dout != null) dout.close(); + } catch (Exception ee) { + log.error("", ee); + } + try { + if (din != null) din.close(); + } catch (Exception ee) { + log.error("", ee); + } + try { + if (sout != null) sout.close(); + } catch (Exception ee) { + log.error("", ee); + } + try { + if (din != null) din.close(); + } catch (Exception ee) { + log.error("", ee); + } + } + } + + public static void makeDirectory(String path, Session session) throws IOException, JSchException, SSHApiException { + + // exec 'scp -t rfile' remotely + String command = "mkdir -p " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); +// session.disconnect(); + + throw new SSHApiException("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("mkdir:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + channel.disconnect(); + } + + public static List<String> listDirectory(String path, Session session) throws IOException, JSchException, + SSHApiException { + + // exec 'scp -t rfile' remotely + String command = "ls " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); +// session.disconnect(); + + throw new SSHApiException("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + stdOutReader.onOutput(channel); + stdOutReader.getStdOutputString(); + if (stdOutReader.getStdErrorString().contains("ls:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + channel.disconnect(); + return Arrays.asList(stdOutReader.getStdOutputString().split("\n")); + } + + + static int checkAck(InputStream in) throws IOException { + int b = in.read(); + if (b == 0) return b; + if (b == -1) return b; + + if (b == 1 || b == 2) { + StringBuffer sb = new StringBuffer(); + int c; + do { + c = in.read(); + sb.append((char) c); + } + while (c != '\n'); + if (b == 1) { // error + System.out.print(sb.toString()); + } + if (b == 2) { // fatal error + System.out.print(sb.toString()); + } + } + return b; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/StandardOutReader.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/StandardOutReader.java b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/StandardOutReader.java new file mode 100644 index 0000000..c03660b --- /dev/null +++ b/modules/file-manager/data-manager-core/src/main/java/org/apache/airavata/data/manager/core/ssh/StandardOutReader.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.data.manager.core.ssh; + +import com.jcraft.jsch.Channel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class StandardOutReader implements CommandOutput { + + private static final Logger logger = LoggerFactory.getLogger(StandardOutReader.class); + String stdOutputString = null; + ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); + private int exitCode; + + public void onOutput(Channel channel) { + try { + StringBuffer pbsOutput = new StringBuffer(""); + InputStream inputStream = channel.getInputStream(); + byte[] tmp = new byte[1024]; + do { + while (inputStream.available() > 0) { + int i = inputStream.read(tmp, 0, 1024); + if (i < 0) break; + pbsOutput.append(new String(tmp, 0, i)); + } + } while (!channel.isClosed()) ; + String output = pbsOutput.toString(); + this.setStdOutputString(output); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + + } + + + public void exitCode(int code) { + System.out.println("Program exit code - " + code); + this.exitCode = code; + } + + @Override + public int getExitCode() { + return exitCode; + } + + public String getStdOutputString() { + return stdOutputString; + } + + public void setStdOutputString(String stdOutputString) { + this.stdOutputString = stdOutputString; + } + + public String getStdErrorString() { + return errorStream.toString(); + } + + public OutputStream getStandardError() { + return errorStream; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerFactoryTest.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerFactoryTest.java b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerFactoryTest.java new file mode 100644 index 0000000..359c286 --- /dev/null +++ b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerFactoryTest.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.aoache.airavata.data.manager.core; + +import junit.framework.Assert; +import org.apache.airavata.data.manager.core.DataManagerImpl; +import org.apache.airavata.data.manager.cpi.DataManager; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.DataCatalog; +import org.apache.airavata.registry.cpi.DataCatalogException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataManagerFactoryTest { + private final static Logger logger = LoggerFactory.getLogger(DataManagerFactoryTest.class); + + @Test + public void testCreateDataManager() throws DataCatalogException, DataCatalogException, AppCatalogException { + DataCatalog dataCatalog = RegistryFactory.getDataCatalog(); + AppCatalog appCatalog = RegistryFactory.getAppCatalog(); + DataManager dataManagerImpl = new DataManagerImpl(appCatalog, dataCatalog); + Assert.assertNotNull(dataManagerImpl); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerImplTest.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerImplTest.java b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerImplTest.java new file mode 100644 index 0000000..42c4091 --- /dev/null +++ b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/DataManagerImplTest.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.aoache.airavata.data.manager.core; + +import org.apache.airavata.data.manager.core.DataManagerImpl; +import org.apache.airavata.data.manager.cpi.DataManagerException; +import org.aoache.airavata.data.manager.core.utils.AppCatInit; +import org.aoache.airavata.data.manager.core.utils.DataCatInit; +import org.apache.airavata.model.data.resource.DataReplicaLocationModel; +import org.apache.airavata.model.data.resource.DataResourceModel; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.DataCatalog; +import org.apache.airavata.data.manager.cpi.DataManager; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; + +public class DataManagerImplTest { + private final static Logger logger = LoggerFactory.getLogger(DataManagerImplTest.class); + private static AppCatInit appCatInit; + private static DataCatInit dataCatInit; + private static DataManager dataManager; + private static DataResourceModel dataResourceModel; + private static DataReplicaLocationModel dataReplicaLocationModel; + + @BeforeClass + public static void setUp() { + try { + System.out.println("********** SET UP ************"); + appCatInit = new AppCatInit("appcatalog-derby.sql"); + appCatInit.initializeDB(); + dataCatInit = new DataCatInit("datacatalog-derby.sql"); + dataCatInit.initializeDB(); + AppCatalog appCatalog = RegistryFactory.getAppCatalog(); + DataCatalog dataCatalog = RegistryFactory.getDataCatalog(); + DataManagerImplTest.dataManager = new DataManagerImpl(appCatalog, dataCatalog); + dataResourceModel = new DataResourceModel(); + dataResourceModel.setResourceName("test-file.txt"); + dataReplicaLocationModel = new DataReplicaLocationModel(); + dataReplicaLocationModel.setReplicaName("1-st-replica"); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + @AfterClass + public static void tearDown() throws Exception { + System.out.println("********** TEAR DOWN ************"); + dataCatInit.stopDerbyServer(); + } + + @Test + public void testPublishDataResource(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + org.junit.Assert.assertNotNull(resourceId); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testRemoveDataResource(){ + try { + boolean result = dataManager.removeResource("234234234"); + Assert.assertFalse(result); + String resourceId = dataManager.registerResource(dataResourceModel); + Assert.assertNotNull(resourceId); + result = dataManager.removeResource(resourceId); + Assert.assertTrue(result); + result = dataManager.removeResource(resourceId); + Assert.assertFalse(result); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testGetDataResource(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + Assert.assertNotNull(resourceId); + DataResourceModel persistedCopy = dataManager.getResource(resourceId); + Assert.assertNotNull(persistedCopy); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testUpdateDataResource(){ + try { + dataResourceModel.setResourceId(UUID.randomUUID().toString()); + boolean result = dataManager.updateResource(dataResourceModel); + Assert.assertFalse(result); + dataManager.registerResource(dataResourceModel); + dataResourceModel.setResourceName("updated-name"); + dataManager.updateResource(dataResourceModel); + dataResourceModel = dataManager.getResource(dataResourceModel.getResourceId()); + Assert.assertTrue(dataResourceModel.getResourceName().equals("updated-name")); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testPublishReplicaLocation(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + dataReplicaLocationModel.setResourceId(resourceId); + String replicaId = dataManager.registerReplicaLocation(dataReplicaLocationModel); + org.junit.Assert.assertNotNull(replicaId); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testRemoveReplicaLocation(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + dataReplicaLocationModel.setResourceId(resourceId); + String replicaId = dataManager.registerReplicaLocation(dataReplicaLocationModel); + boolean result = dataManager.removeReplicaLocation(replicaId); + Assert.assertTrue(result); + result = dataManager.removeReplicaLocation(dataReplicaLocationModel.getReplicaId()); + Assert.assertFalse(result); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testGetReplicaLocation(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + dataReplicaLocationModel.setResourceId(resourceId); + String replicaId = dataManager.registerReplicaLocation(dataReplicaLocationModel); + DataReplicaLocationModel persistedCopy = dataManager.getReplicaLocation(replicaId); + Assert.assertNotNull(persistedCopy); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testUpdateReplicaLocation(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + dataReplicaLocationModel.setResourceId(resourceId); + String replicaId = dataManager.registerReplicaLocation(dataReplicaLocationModel); + DataReplicaLocationModel persistedCopy = dataManager.getReplicaLocation(replicaId); + persistedCopy.setReplicaDescription("updated-description"); + dataManager.updateReplicaLocation(persistedCopy); + persistedCopy = dataManager.getReplicaLocation(replicaId); + Assert.assertTrue(persistedCopy.getReplicaDescription().equals("updated-description")); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testGetAllReplicaLocations(){ + try { + String resourceId = dataManager.registerResource(dataResourceModel); + dataReplicaLocationModel.setResourceId(resourceId); + String replicaId = dataManager.registerReplicaLocation(dataReplicaLocationModel); + List<DataReplicaLocationModel> replicaLocationModelList = dataManager.getAllReplicaLocations(resourceId); + Assert.assertTrue(replicaLocationModelList.get(0).getReplicaId().equals(replicaId)); + } catch (DataManagerException e) { + e.printStackTrace(); + Assert.fail(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/AppCatInit.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/AppCatInit.java b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/AppCatInit.java new file mode 100644 index 0000000..cdeb0ce --- /dev/null +++ b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/AppCatInit.java @@ -0,0 +1,320 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.aoache.airavata.data.manager.core.utils; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.derby.drda.NetworkServerControl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.URI; +import java.sql.*; +import java.util.StringTokenizer; + +public class AppCatInit { + private static final Logger logger = LoggerFactory.getLogger(AppCatInit.class); + public static final String DERBY_SERVER_MODE_SYS_PROPERTY = "derby.drda.startNetworkServer"; + public String scriptName = "appcatalog-derby.sql"; + private NetworkServerControl server; + private static final String delimiter = ";"; + public static final String COMPUTE_RESOURCE_TABLE = "COMPUTE_RESOURCE"; + private String jdbcUrl = null; + private String jdbcDriver = null; + private String jdbcUser = null; + private String jdbcPassword = null; + + public AppCatInit(String scriptName) { + this.scriptName = scriptName; + } + + public static boolean checkStringBufferEndsWith(StringBuffer buffer, String suffix) { + if (suffix.length() > buffer.length()) { + return false; + } + // this loop is done on purpose to avoid memory allocation performance + // problems on various JDKs + // StringBuffer.lastIndexOf() was introduced in jdk 1.4 and + // implementation is ok though does allocation/copying + // StringBuffer.toString().endsWith() does massive memory + // allocation/copying on JDK 1.5 + // See http://issues.apache.org/bugzilla/show_bug.cgi?id=37169 + int endIndex = suffix.length() - 1; + int bufferIndex = buffer.length() - 1; + while (endIndex >= 0) { + if (buffer.charAt(bufferIndex) != suffix.charAt(endIndex)) { + return false; + } + bufferIndex--; + endIndex--; + } + return true; + } + + private static boolean isServerStarted(NetworkServerControl server, int ntries) + { + for (int i = 1; i <= ntries; i ++) + { + try { + Thread.sleep(500); + server.ping(); + return true; + } + catch (Exception e) { + if (i == ntries) + return false; + } + } + return false; + } + + public void initializeDB() { + + try{ + jdbcDriver = ServerSettings.getSetting("appcatalog.jdbc.driver"); + jdbcUrl = ServerSettings.getSetting("appcatalog.jdbc.url"); + jdbcUser = ServerSettings.getSetting("appcatalog.jdbc.user"); + jdbcPassword = ServerSettings.getSetting("appcatalog.jdbc.password"); + jdbcUrl = jdbcUrl + "?" + "user=" + jdbcUser + "&" + "password=" + jdbcPassword; + } catch (ApplicationSettingsException e) { + logger.error("Unable to read properties", e); + } + + startDerbyInServerMode(); + if(!isServerStarted(server, 20)){ + throw new RuntimeException("Derby server cound not started within five seconds..."); + } +// startDerbyInEmbeddedMode(); + + Connection conn = null; + try { + Class.forName(jdbcDriver).newInstance(); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + if (!isDatabaseStructureCreated(COMPUTE_RESOURCE_TABLE, conn)) { + executeSQLScript(conn); + logger.info("New Database created for App Catalog !!!"); + } else { + logger.debug("Database already created for App Catalog!"); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("Database failure", e); + } finally { + try { + if (conn != null){ + if (!conn.getAutoCommit()) { + conn.commit(); + } + conn.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + + public static boolean isDatabaseStructureCreated(String tableName, Connection conn) { + try { + System.out.println("Running a query to test the database tables existence."); + // check whether the tables are already created with a query + Statement statement = null; + try { + statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("select * from " + tableName); + if (rs != null) { + rs.close(); + } + } finally { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + return false; + } + } + } catch (SQLException e) { + return false; + } + + return true; + } + + private void executeSQLScript(Connection conn) throws Exception { + StringBuffer sql = new StringBuffer(); + BufferedReader reader = null; + try{ + + InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(scriptName); + reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("//")) { + continue; + } + if (line.startsWith("--")) { + continue; + } + StringTokenizer st = new StringTokenizer(line); + if (st.hasMoreTokens()) { + String token = st.nextToken(); + if ("REM".equalsIgnoreCase(token)) { + continue; + } + } + sql.append(" ").append(line); + + // SQL defines "--" as a comment to EOL + // and in Oracle it may contain a hint + // so we cannot just remove it, instead we must end it + if (line.indexOf("--") >= 0) { + sql.append("\n"); + } + if ((checkStringBufferEndsWith(sql, delimiter))) { + executeSQL(sql.substring(0, sql.length() - delimiter.length()), conn); + sql.replace(0, sql.length(), ""); + } + } + // Catch any statements not followed by ; + if (sql.length() > 0) { + executeSQL(sql.toString(), conn); + } + }catch (IOException e){ + logger.error("Error occurred while executing SQL script for creating Airavata database", e); + throw new Exception("Error occurred while executing SQL script for creating Airavata database", e); + }finally { + if (reader != null) { + reader.close(); + } + + } + + } + + private static void executeSQL(String sql, Connection conn) throws Exception { + // Check and ignore empty statements + if ("".equals(sql.trim())) { + return; + } + + Statement statement = null; + try { + logger.debug("SQL : " + sql); + + boolean ret; + int updateCount = 0, updateCountTotal = 0; + statement = conn.createStatement(); + ret = statement.execute(sql); + updateCount = statement.getUpdateCount(); + do { + if (!ret) { + if (updateCount != -1) { + updateCountTotal += updateCount; + } + } + ret = statement.getMoreResults(); + if (ret) { + updateCount = statement.getUpdateCount(); + } + } while (ret); + + logger.debug(sql + " : " + updateCountTotal + " rows affected"); + + SQLWarning warning = conn.getWarnings(); + while (warning != null) { + logger.warn(warning + " sql warning"); + warning = warning.getNextWarning(); + } + conn.clearWarnings(); + } catch (SQLException e) { + if (e.getSQLState().equals("X0Y32")) { + // eliminating the table already exception for the derby + // database + logger.info("Table Already Exists", e); + } else { + throw new Exception("Error occurred while executing : " + sql, e); + } + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + logger.error("Error occurred while closing result set.", e); + } + } + } + } + + private void startDerbyInServerMode() { + try { + System.setProperty(DERBY_SERVER_MODE_SYS_PROPERTY, "true"); + server = new NetworkServerControl(InetAddress.getByName("0.0.0.0"), + 20000, + jdbcUser, jdbcPassword); + java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true); + server.start(consoleWriter); + } catch (IOException e) { + logger.error("Unable to start Apache derby in the server mode! Check whether " + + "specified port is available"); + } catch (Exception e) { + logger.error("Unable to start Apache derby in the server mode! Check whether " + + "specified port is available"); + } + + } + + public static int getPort(String jdbcURL){ + try{ + String cleanURI = jdbcURL.substring(5); + URI uri = URI.create(cleanURI); + return uri.getPort(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return -1; + } + } + + private void startDerbyInEmbeddedMode(){ + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + DriverManager.getConnection("jdbc:derby:memory:unit-testing-jpa;create=true").close(); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + public void stopDerbyServer() { + try { + server.shutdown(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/DataCatInit.java ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/DataCatInit.java b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/DataCatInit.java new file mode 100644 index 0000000..d1c8491 --- /dev/null +++ b/modules/file-manager/data-manager-core/src/test/java/org/aoache/airavata/data/manager/core/utils/DataCatInit.java @@ -0,0 +1,315 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.aoache.airavata.data.manager.core.utils; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.registry.core.data.catalog.utils.DataCatalogConstants; +import org.apache.derby.drda.NetworkServerControl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.URI; +import java.sql.*; +import java.util.StringTokenizer; + +public class DataCatInit { + private static final Logger logger = LoggerFactory.getLogger(DataCatInit.class); + public static final String DERBY_SERVER_MODE_SYS_PROPERTY = "derby.drda.startNetworkServer"; + public String scriptName = "datacatalog-derby.sql"; + private NetworkServerControl server; + private static final String delimiter = ";"; + private String jdbcUrl = null; + private String jdbcDriver = null; + private String jdbcUser = null; + private String jdbcPassword = null; + + public DataCatInit(String scriptName) { + this.scriptName = scriptName; + } + + public static boolean checkStringBufferEndsWith(StringBuffer buffer, String suffix) { + if (suffix.length() > buffer.length()) { + return false; + } + // this loop is done on purpose to avoid memory allocation performance + // problems on various JDKs + // StringBuffer.lastIndexOf() was introduced in jdk 1.4 and + // implementation is ok though does allocation/copying + // StringBuffer.toString().endsWith() does massive memory + // allocation/copying on JDK 1.5 + // See http://issues.apache.org/bugzilla/show_bug.cgi?id=37169 + int endIndex = suffix.length() - 1; + int bufferIndex = buffer.length() - 1; + while (endIndex >= 0) { + if (buffer.charAt(bufferIndex) != suffix.charAt(endIndex)) { + return false; + } + bufferIndex--; + endIndex--; + } + return true; + } + + private static boolean isServerStarted(NetworkServerControl server, int ntries) + { + for (int i = 1; i <= ntries; i ++) + { + try { + Thread.sleep(500); + server.ping(); + return true; + } + catch (Exception e) { + if (i == ntries) + return false; + } + } + return false; + } + + public void initializeDB() { + try{ + jdbcDriver = ServerSettings.getSetting("datacatalog.jdbc.driver"); + jdbcUrl = ServerSettings.getSetting("datacatalog.jdbc.url"); + jdbcUser = ServerSettings.getSetting("datacatalog.jdbc.user"); + jdbcPassword = ServerSettings.getSetting("datacatalog.jdbc.password"); + jdbcUrl = jdbcUrl + "?" + "user=" + jdbcUser + "&" + "password=" + jdbcPassword; + } catch (ApplicationSettingsException e) { + logger.error("Unable to read properties", e); + } + + startDerbyInServerMode(); + if(!isServerStarted(server, 20)){ + throw new RuntimeException("Derby server could not started within five seconds..."); + } + Connection conn = null; + try { + Class.forName(jdbcDriver).newInstance(); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + if (!isDatabaseStructureCreated(DataCatalogConstants.CONFIGURATION, conn)) { + executeSQLScript(conn); + logger.info("New Database created for Data Catalog !!!"); + } else { + logger.debug("Database already created for Data Catalog!"); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("Database failure", e); + } finally { + try { + if (conn != null){ + if (!conn.getAutoCommit()) { + conn.commit(); + } + conn.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + + public static boolean isDatabaseStructureCreated(String tableName, Connection conn) { + try { + System.out.println("Running a query to test the database tables existence."); + // check whether the tables are already created with a query + Statement statement = null; + try { + statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("select * from " + tableName); + if (rs != null) { + rs.close(); + } + } finally { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + return false; + } + } + } catch (SQLException e) { + return false; + } + + return true; + } + + private void executeSQLScript(Connection conn) throws Exception { + StringBuffer sql = new StringBuffer(); + BufferedReader reader = null; + try{ + + InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(scriptName); + reader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("//")) { + continue; + } + if (line.startsWith("--")) { + continue; + } + StringTokenizer st = new StringTokenizer(line); + if (st.hasMoreTokens()) { + String token = st.nextToken(); + if ("REM".equalsIgnoreCase(token)) { + continue; + } + } + sql.append(" ").append(line); + + // SQL defines "--" as a comment to EOL + // and in Oracle it may contain a hint + // so we cannot just remove it, instead we must end it + if (line.indexOf("--") >= 0) { + sql.append("\n"); + } + if ((checkStringBufferEndsWith(sql, delimiter))) { + executeSQL(sql.substring(0, sql.length() - delimiter.length()), conn); + sql.replace(0, sql.length(), ""); + } + } + // Catch any statements not followed by ; + if (sql.length() > 0) { + executeSQL(sql.toString(), conn); + } + }catch (IOException e){ + logger.error("Error occurred while executing SQL script for creating Airavata Data Catalog database", e); + throw new Exception("Error occurred while executing SQL script for creating Airavata Data Catalog database", e); + }finally { + if (reader != null) { + reader.close(); + } + } + } + + private static void executeSQL(String sql, Connection conn) throws Exception { + // Check and ignore empty statements + if ("".equals(sql.trim())) { + return; + } + + Statement statement = null; + try { + logger.debug("SQL : " + sql); + + boolean ret; + int updateCount = 0, updateCountTotal = 0; + statement = conn.createStatement(); + ret = statement.execute(sql); + updateCount = statement.getUpdateCount(); + do { + if (!ret) { + if (updateCount != -1) { + updateCountTotal += updateCount; + } + } + ret = statement.getMoreResults(); + if (ret) { + updateCount = statement.getUpdateCount(); + } + } while (ret); + + logger.debug(sql + " : " + updateCountTotal + " rows affected"); + + SQLWarning warning = conn.getWarnings(); + while (warning != null) { + logger.warn(warning + " sql warning"); + warning = warning.getNextWarning(); + } + conn.clearWarnings(); + } catch (SQLException e) { + if (e.getSQLState().equals("X0Y32")) { + // eliminating the table already exception for the derby + // database + logger.info("Table Already Exists", e); + } else { + throw new Exception("Error occurred while executing : " + sql, e); + } + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + logger.error("Error occurred while closing result set.", e); + } + } + } + } + + private void startDerbyInServerMode() { + try { + System.setProperty(DERBY_SERVER_MODE_SYS_PROPERTY, "true"); + server = new NetworkServerControl(InetAddress.getByName("0.0.0.0"), + 20000, + jdbcUser, jdbcPassword); + java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true); + server.start(consoleWriter); + } catch (IOException e) { + logger.error("Unable to start Apache derby in the server mode! Check whether " + + "specified port is available"); + } catch (Exception e) { + logger.error("Unable to start Apache derby in the server mode! Check whether " + + "specified port is available"); + } + + } + + public static int getPort(String jdbcURL){ + try{ + String cleanURI = jdbcURL.substring(5); + URI uri = URI.create(cleanURI); + return uri.getPort(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return -1; + } + } + + private void startDerbyInEmbeddedMode(){ + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + DriverManager.getConnection("jdbc:derby:memory:unit-testing-jpa;create=true").close(); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + public void stopDerbyServer() { + try { + server.shutdown(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1a0772f/modules/file-manager/data-manager-cpi/pom.xml ---------------------------------------------------------------------- diff --git a/modules/file-manager/data-manager-cpi/pom.xml b/modules/file-manager/data-manager-cpi/pom.xml new file mode 100644 index 0000000..93137ef --- /dev/null +++ b/modules/file-manager/data-manager-cpi/pom.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>data-manager</artifactId> + <groupId>org.apache.airavata</groupId> + <version>0.16-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>data-manager-cpi</artifactId> + <packaging>jar</packaging> + <name>Airavata Data Manager CPI</name> + <url>http://airavata.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-data-models</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-commons</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file
