Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerClient.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerClient.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerClient.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerClient.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,746 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.Transceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.oodt.cas.filemgr.datatransfer.DataTransfer; +import org.apache.oodt.cas.filemgr.structs.Element; +import org.apache.oodt.cas.filemgr.structs.Product; +import org.apache.oodt.cas.filemgr.structs.FileTransferStatus; +import org.apache.oodt.cas.filemgr.structs.ProductPage; +import org.apache.oodt.cas.filemgr.structs.ProductType; +import org.apache.oodt.cas.filemgr.structs.Reference; +import org.apache.oodt.cas.filemgr.structs.Query; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroFileManager; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroFileTransferStatus; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroProduct; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroElement; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroQueryResult; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroProductType; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroReference; +import org.apache.oodt.cas.filemgr.structs.exceptions.RepositoryManagerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; +import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException; +import org.apache.oodt.cas.filemgr.structs.exceptions.ValidationLayerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.VersioningException; +import org.apache.oodt.cas.filemgr.structs.exceptions.ConnectionException; +import org.apache.oodt.cas.filemgr.structs.query.ComplexQuery; +import org.apache.oodt.cas.filemgr.structs.query.QueryResult; +import org.apache.oodt.cas.filemgr.util.AvroTypeFactory; +import org.apache.oodt.cas.filemgr.util.GenericFileManagerObjectFactory; +import org.apache.oodt.cas.filemgr.versioning.Versioner; +import org.apache.oodt.cas.metadata.Metadata; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author radu + * + * <p>Implementaion of FileManagerClient that uses apache avro-ipc API.</p> + */ +public class AvroFileManagerClient implements FileManagerClient { + + private static Logger LOG = Logger.getLogger(AvroFileManagerClient.class + .getName()); + + /* Avro-Rpc client */ + Transceiver client; + + /* proxy for the server */ + AvroFileManager proxy; + + /* URL where the fileManager is */ + private URL fileManagerUrl; + + /*DataTransfer class for transferring products*/ + private DataTransfer dataTransfer = null; + + public AvroFileManagerClient(final URL url) throws ConnectionException { + this(url, true); + } + + public AvroFileManagerClient(final URL url, boolean testConnection) throws ConnectionException { + //setup the and start the client + try { + this.fileManagerUrl = url; + InetSocketAddress inetSocketAddress = new InetSocketAddress(url.getHost(),this.fileManagerUrl.getPort()); + this.client = new NettyTransceiver(inetSocketAddress); + proxy = (AvroFileManager) SpecificRequestor.getClient(AvroFileManager.class, client); + + } catch (IOException e) { + e.printStackTrace(); + LOG.log(Level.WARNING, "IOException when connecting to filemgr: [" + + this.fileManagerUrl + "]"); + } + + if (testConnection && !isAlive()) { + throw new ConnectionException("Exception connecting to filemgr: [" + + this.fileManagerUrl + "]"); + } + } + + @Override + public boolean refreshConfigAndPolicy() { + boolean success = false; + + try { + success = proxy.refreshConfigAndPolicy(); + } catch (AvroRemoteException e) { + LOG.log(Level.WARNING, "AvroRemoteException when connecting to filemgr: [" + + this.fileManagerUrl + "]"); + success = false; + } + + return success; + } + + @Override + public boolean isAlive() { + boolean success; + + try { + if(proxy != null) + success = proxy.isAlive(); + else return false; + } catch (AvroRemoteException e) { + LOG.log(Level.WARNING, "AvroRemoteException when connecting to filemgr: [" + + this.fileManagerUrl + "]"); + success = false; + } + + return success; + } + + @Override + public boolean transferringProduct(Product product) throws DataTransferException { + boolean success; + + try { + success = proxy.transferringProduct(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + e.printStackTrace(); + throw new DataTransferException(e.getMessage()); + } + + return success; + } + + @Override + public boolean removeProductTransferStatus(Product product) throws DataTransferException { + boolean success; + try { + success = proxy.removeProductTransferStatus(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + return success; + } + + @Override + public boolean isTransferComplete(Product product) throws DataTransferException { + boolean success; + try { + success = this.proxy.isTransferComplete(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + return success; + } + + @Override + public boolean moveProduct(Product product, String newPath) throws DataTransferException { + boolean success; + try { + success = this.proxy.moveProduct(AvroTypeFactory.getAvroProduct(product), newPath); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + return success; + } + + @Override + public boolean modifyProduct(Product product) throws CatalogException { + boolean success; + try { + success = this.proxy.modifyProduct(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return success; + } + + @Override + public boolean removeProduct(Product product) throws CatalogException { + boolean success; + try { + success = this.proxy.removeProduct(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return success; + } + + @Override + public FileTransferStatus getCurrentFileTransfer() throws DataTransferException { + try { + return AvroTypeFactory.getFileTransferStatus(this.proxy.getCurrentFileTransfer()); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + } + + @Override + public List<FileTransferStatus> getCurrentFileTransfers() throws DataTransferException { + List<FileTransferStatus> fileTransferStatuses = new ArrayList<FileTransferStatus>(); + try { + for (AvroFileTransferStatus afts : this.proxy.getCurrentFileTransfers()) { + fileTransferStatuses.add(AvroTypeFactory.getFileTransferStatus(afts)); + } + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + return fileTransferStatuses; + } + + @Override + public double getProductPctTransferred(Product product) throws DataTransferException { + try { + return this.proxy.getProductPctTransferred(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + } + + @Override + public double getRefPctTransferred(Reference reference) throws DataTransferException { + try { + return this.proxy.getRefPctTransferred(AvroTypeFactory.getAvroReference(reference)); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + } + + @Override + public ProductPage pagedQuery(Query query, ProductType type, int pageNum) throws CatalogException { + try { + + return AvroTypeFactory.getProductPage(this.proxy.pagedQuery( + AvroTypeFactory.getAvroQuery(query), + AvroTypeFactory.getAvroProductType(type), + pageNum + )); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public ProductPage getFirstPage(ProductType type) throws CatalogException { + try { + return AvroTypeFactory.getProductPage(this.proxy.getFirstPage(AvroTypeFactory.getAvroProductType(type))); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public ProductPage getLastPage(ProductType type) throws CatalogException { + try { + return AvroTypeFactory.getProductPage(this.proxy.getLastPage(AvroTypeFactory.getAvroProductType(type))); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public ProductPage getNextPage(ProductType type, ProductPage currPage) throws CatalogException { + try { + return AvroTypeFactory.getProductPage(this.proxy.getNextPage( + AvroTypeFactory.getAvroProductType(type), + AvroTypeFactory.getAvroProductPage(currPage) + )); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public ProductPage getPrevPage(ProductType type, ProductPage currPage) throws CatalogException { + try { + return AvroTypeFactory.getProductPage(this.proxy.getPrevPage( + AvroTypeFactory.getAvroProductType(type), + AvroTypeFactory.getAvroProductPage(currPage) + )); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public String addProductType(ProductType type) throws RepositoryManagerException { + try { + return this.proxy.addProductType(AvroTypeFactory.getAvroProductType(type)); + } catch (AvroRemoteException e) { + throw new RepositoryManagerException(e.getMessage()); + } + } + + @Override + public boolean hasProduct(String productName) throws CatalogException { + try { + return this.proxy.hasProduct(productName); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public int getNumProducts(ProductType type) throws CatalogException { + try { + return this.proxy.getNumProducts(AvroTypeFactory.getAvroProductType(type)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public List<Product> getTopNProducts(int n) throws CatalogException { + List<Product> products = new ArrayList<Product>(); + try { + for (AvroProduct p : this.proxy.getTopNProducts(n)) { + products.add(AvroTypeFactory.getProduct(p)); + } + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return products; + } + + @Override + public List<Product> getTopNProducts(int n, ProductType type) throws CatalogException { + List<Product> products = new ArrayList<Product>(); + try { + for (AvroProduct p : this.proxy.getTopNProductsByProductType(n, AvroTypeFactory.getAvroProductType(type))) { + products.add(AvroTypeFactory.getProduct(p)); + } + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return products; + } + + @Override + public void setProductTransferStatus(Product product) throws CatalogException { + try { + this.proxy.setProductTransferStatus(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public void addProductReferences(Product product) throws CatalogException { + try { + this.proxy.addProductReferences(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public void addMetadata(Product product, Metadata metadata) throws CatalogException { + try { + this.proxy.addMetadata(AvroTypeFactory.getAvroProduct(product), + AvroTypeFactory.getAvroMetadata(metadata)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + + } + + @Override + public boolean updateMetadata(Product product, Metadata met) throws CatalogException { + try { + return this.proxy.updateMetadata( + AvroTypeFactory.getAvroProduct(product), + AvroTypeFactory.getAvroMetadata(met) + ); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public String catalogProduct(Product product) throws CatalogException { + try { + return this.proxy.catalogProduct(AvroTypeFactory.getAvroProduct(product)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public Metadata getMetadata(Product product) throws CatalogException { + try { + return AvroTypeFactory.getMetadata(this.proxy.getMetadata(AvroTypeFactory.getAvroProduct(product))); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public Metadata getReducedMetadata(Product product, List<?> elements) throws CatalogException { + try { + return AvroTypeFactory.getMetadata( + this.proxy.getReducedMetadata(AvroTypeFactory.getAvroProduct(product), (List<String>) elements)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public boolean removeFile(String filePath) throws DataTransferException { + try { + return this.proxy.removeFile(filePath); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + } + + @Override + public byte[] retrieveFile(String filePath, int offset, int numBytes) throws DataTransferException { + try { + return this.proxy.retrieveFile(filePath, offset, numBytes).array(); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + } + + @Override + public void transferFile(String filePath, byte[] fileData, int offset, int numBytes) throws DataTransferException { + try { + this.proxy.transferFile(filePath, ByteBuffer.wrap(fileData), offset, numBytes); + } catch (AvroRemoteException e) { + throw new DataTransferException(e.getMessage()); + } + + } + + @Override + public List<Product> getProductsByProductType(ProductType type) throws CatalogException { + List<Product> products = new ArrayList<Product>(); + try { + for (AvroProduct ap : this.proxy.getProductsByProductType(AvroTypeFactory.getAvroProductType(type))) { + products.add(AvroTypeFactory.getProduct(ap)); + } + return products; + + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + + } + + @Override + public List<Element> getElementsByProductType(ProductType type) throws ValidationLayerException { + List<Element> products = new ArrayList<Element>(); + try { + for (AvroElement ap : this.proxy.getElementsByProductType(AvroTypeFactory.getAvroProductType(type))) { + products.add(AvroTypeFactory.getElement(ap)); + } + } catch (AvroRemoteException e) { + throw new ValidationLayerException(e.getMessage()); + } + return products; + } + + @Override + public Element getElementById(String elementId) throws ValidationLayerException { + try { + return AvroTypeFactory.getElement(this.proxy.getElementById(elementId)); + } catch (AvroRemoteException e) { + throw new ValidationLayerException(e.getMessage()); + } + } + + @Override + public Element getElementByName(String elementName) throws ValidationLayerException { + try { + return AvroTypeFactory.getElement(this.proxy.getElementByName(elementName)); + } catch (AvroRemoteException e) { + throw new ValidationLayerException(e.getMessage()); + } + } + + @Override + public List<QueryResult> complexQuery(ComplexQuery complexQuery) throws CatalogException { + List<QueryResult> queryResults = new ArrayList<QueryResult>(); + try { + List<AvroQueryResult> avroQueryResults = this.proxy.complexQuery(AvroTypeFactory.getAvroComplexQuery(complexQuery)); + for (AvroQueryResult aqr : avroQueryResults) { + queryResults.add(AvroTypeFactory.getQueryResult(aqr)); + } + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return queryResults; + } + + @Override + public List<Product> query(Query query, ProductType type) throws CatalogException { + List<Product> products = new ArrayList<Product>(); + try { + for (AvroProduct ap : this.proxy.query(AvroTypeFactory.getAvroQuery(query), AvroTypeFactory.getAvroProductType(type))) { + products.add(AvroTypeFactory.getProduct(ap)); + } + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return products; + } + + @Override + public ProductType getProductTypeByName(String productTypeName) throws RepositoryManagerException { + try { + return AvroTypeFactory.getProductType(this.proxy.getProductTypeByName(productTypeName)); + } catch (AvroRemoteException e) { + throw new RepositoryManagerException(e.getMessage()); + } + } + + @Override + public ProductType getProductTypeById(String productTypeId) throws RepositoryManagerException { + try { + return AvroTypeFactory.getProductType(this.proxy.getProductTypeById(productTypeId)); + } catch (AvroRemoteException e) { + throw new RepositoryManagerException(e.getMessage()); + } + } + + @Override + public List<ProductType> getProductTypes() throws RepositoryManagerException { + List<ProductType> productTypes = new ArrayList<ProductType>(); + try { + for (AvroProductType apt : this.proxy.getProductTypes()) { + productTypes.add(AvroTypeFactory.getProductType(apt)); + } + } catch (AvroRemoteException e) { + throw new RepositoryManagerException(e.getMessage()); + } + return productTypes; + } + + @Override + public List<Reference> getProductReferences(Product product) throws CatalogException { + List<Reference> references = new ArrayList<Reference>(); + try { + for (AvroReference ar : this.proxy.getProductReferences(AvroTypeFactory.getAvroProduct(product))) { + references.add(AvroTypeFactory.getReference(ar)); + } + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + return references; + } + + @Override + public Product getProductById(String productId) throws CatalogException { + try { + return AvroTypeFactory.getProduct(this.proxy.getProductById(productId)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public Product getProductByName(String productName) throws CatalogException { + try { + return AvroTypeFactory.getProduct(this.proxy.getProductByName(productName)); + } catch (AvroRemoteException e) { + throw new CatalogException(e.getMessage()); + } + } + + @Override + public String ingestProduct(Product product, Metadata metadata, + boolean clientTransfer) throws Exception { + try { + // ingest product + String productId = this.proxy.ingestProduct( + AvroTypeFactory.getAvroProduct(product), + AvroTypeFactory.getAvroMetadata(metadata), + clientTransfer); + + if (clientTransfer) { + LOG.log(Level.FINEST, + "File Manager Client: clientTransfer enabled: " + + "transfering product [" + + product.getProductName() + "]"); + + // we need to transfer the product ourselves + // make sure we have the product ID + if (productId == null) { + throw new Exception("Request to ingest product: " + + product.getProductName() + + " but no product ID returned from File " + + "Manager ingest"); + } + + if (dataTransfer == null) { + throw new Exception("Request to ingest product: [" + + product.getProductName() + + "] using client transfer, but no " + + "dataTransferer specified!"); + } + + product.setProductId(productId); + + if (!Boolean.getBoolean("org.apache.oodt.cas.filemgr.serverside.versioning")) { + // version the product + Versioner versioner = GenericFileManagerObjectFactory + .getVersionerFromClassName(product.getProductType() + .getVersioner()); + versioner.createDataStoreReferences(product, metadata); + + // add the newly versioned references to the data store + try { + addProductReferences(product); + } catch (CatalogException e) { + LOG + .log( + Level.SEVERE, + "ingestProduct: RepositoryManagerException " + + "when adding Product References for Product : " + + product.getProductName() + + " to RepositoryManager: Message: " + + e.getMessage()); + throw e; + } + } else { + product.setProductReferences(getProductReferences(product)); + } + + // now transfer the product + try { + dataTransfer.transferProduct(product); + // now update the product's transfer status in the data + // store + product.setTransferStatus(Product.STATUS_RECEIVED); + + try { + setProductTransferStatus(product); + } catch (CatalogException e) { + LOG + .log( + Level.SEVERE, + "ingestProduct: RepositoryManagerException " + + "when updating product transfer status for Product: " + + product.getProductName() + + " Message: " + e.getMessage()); + throw e; + } + } catch (Exception e) { + LOG.log(Level.SEVERE, + "ingestProduct: DataTransferException when transfering Product: " + + product.getProductName() + ": Message: " + + e.getMessage()); + throw new DataTransferException(e); + } + + } + return productId; + + // error versioning file + } catch (VersioningException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "ingestProduct: VersioningException when versioning Product: " + + product.getProductName() + " with Versioner " + + product.getProductType().getVersioner() + + ": Message: " + e.getMessage()); + throw new VersioningException(e); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "Failed to ingest product [" + product + + "] : " + e.getMessage() + " -- rolling back ingest"); + try { + AvroProduct avroProduct = AvroTypeFactory.getAvroProduct(product); + this.proxy.removeProduct(avroProduct); + } catch (Exception e1) { + LOG.log(Level.SEVERE, "Failed to rollback ingest of product [" + + product + "] : " + e.getMessage()); + } + throw new Exception("Failed to ingest product [" + product + "] : " + + e.getMessage()); + } + + } + + @Override + public Metadata getCatalogValues(Metadata metadata, ProductType productType) throws Exception { + return AvroTypeFactory.getMetadata(this.proxy.getCatalogValues( + AvroTypeFactory.getAvroMetadata(metadata), + AvroTypeFactory.getAvroProductType(productType))); + } + + @Override + public Metadata getOrigValues(Metadata metadata, ProductType productType) throws Exception { + return AvroTypeFactory.getMetadata(this.proxy.getOrigValues( + AvroTypeFactory.getAvroMetadata(metadata), + AvroTypeFactory.getAvroProductType(productType))); + } + + @Override + public Query getCatalogQuery(Query query, ProductType productType) throws Exception { + return AvroTypeFactory.getQuery(this.proxy.getCatalogQuery( + AvroTypeFactory.getAvroQuery(query), + AvroTypeFactory.getAvroProductType(productType) + )); + } + + @Override + public URL getFileManagerUrl() { + return this.fileManagerUrl; + } + + @Override + public void setFileManagerUrl(URL fileManagerUrl) { + this.fileManagerUrl = fileManagerUrl; + + } + + @Override + public DataTransfer getDataTransfer() { + return this.dataTransfer; + } + + @Override + public void setDataTransfer(DataTransfer dataTransfer) { + this.dataTransfer = dataTransfer; + } + +}
Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerServer.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerServer.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerServer.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/AvroFileManagerServer.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.oodt.cas.filemgr.system; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.Server; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.oodt.cas.filemgr.catalog.Catalog; +import org.apache.oodt.cas.filemgr.datatransfer.DataTransfer; + +import org.apache.oodt.cas.filemgr.structs.Element; +import org.apache.oodt.cas.filemgr.structs.Product; +import org.apache.oodt.cas.filemgr.structs.FileTransferStatus; +import org.apache.oodt.cas.filemgr.structs.ProductType; +import org.apache.oodt.cas.filemgr.structs.Reference; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroFileManager; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroFileTransferStatus; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroProduct; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroElement; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroQueryResult; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroProductType; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroReference; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroProductPage; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroQuery; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroMetadata; +import org.apache.oodt.cas.filemgr.structs.avrotypes.AvroComplexQuery; +import org.apache.oodt.cas.filemgr.structs.exceptions.RepositoryManagerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; +import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException; +import org.apache.oodt.cas.filemgr.structs.exceptions.ValidationLayerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.VersioningException; +import org.apache.oodt.cas.filemgr.structs.exceptions.QueryFormulationException; +import org.apache.oodt.cas.filemgr.structs.query.QueryResult; +import org.apache.oodt.cas.filemgr.util.AvroTypeFactory; +import org.apache.oodt.cas.filemgr.util.GenericFileManagerObjectFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +/** + * @author radu + * + * <p>Implementaion of FileManagerServer that uses apache avro-ipc API.</p> + */ +public class AvroFileManagerServer implements AvroFileManager, FileManagerServer { + + /*port for server*/ + protected int port = 1999; + + private Server server; + + /* file manager tools */ + FileManager fileManager; + + public AvroFileManagerServer(int port){ + this.port = port; + } + + @Override + public boolean startUp() throws Exception { + server = new NettyServer(new SpecificResponder(AvroFileManager.class,this),new InetSocketAddress(this.port)); + server.start(); + try { + this.fileManager = new FileManager(); + this.loadConfiguration(); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return true; + } + + public void loadConfiguration() throws IOException { + fileManager.loadConfiguration(); + + String transferFactory = null; + + transferFactory = System.getProperty("filemgr.datatransfer.factory", + "org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory"); + + DataTransfer dataTransfer = GenericFileManagerObjectFactory + .getDataTransferServiceFromFactory(transferFactory); + + dataTransfer + .setFileManagerUrl(new URL("http://localhost:" + port)); + + fileManager.setDataTransfer(dataTransfer); + + } + + + @Override + public boolean shutdown() { + this.server.close(); + return true; + } + + @Override + public boolean isAlive() { + return true; + } + + @Override + public void setCatalog(Catalog catalog) { + this.fileManager.setCatalog(catalog); + + } + + @Override + public boolean refreshConfigAndPolicy() throws AvroRemoteException { + return this.fileManager.refreshConfigAndPolicy(); + } + + @Override + public boolean transferringProduct(AvroProduct p) throws AvroRemoteException { + return this.fileManager.transferringProduct(AvroTypeFactory.getProduct(p)); + } + + @Override + public AvroFileTransferStatus getCurrentFileTransfer() throws AvroRemoteException { + return AvroTypeFactory.getAvroFileTransferStatus(this.fileManager.getCurrentFileTransfer()); + } + + @Override + public List<AvroFileTransferStatus> getCurrentFileTransfers() throws AvroRemoteException { + List<AvroFileTransferStatus> avroFileTransferStatuses = new ArrayList<AvroFileTransferStatus>(); + for (FileTransferStatus fts : this.fileManager.getCurrentFileTransfers()){ + avroFileTransferStatuses.add(AvroTypeFactory.getAvroFileTransferStatus(fts)); + } + return avroFileTransferStatuses; + } + + @Override + public double getProductPctTransferred(AvroProduct product) throws AvroRemoteException { + return this.fileManager.getProductPctTransferred(AvroTypeFactory.getProduct(product)); + } + + @Override + public double getRefPctTransferred(AvroReference reference) throws AvroRemoteException { + return this.fileManager.getRefPctTransferred(AvroTypeFactory.getReference(reference)); + } + + @Override + public boolean removeProductTransferStatus(AvroProduct product) throws AvroRemoteException { + return this.fileManager.removeProductTransferStatus(AvroTypeFactory.getProduct(product)); + } + + @Override + public boolean isTransferComplete(AvroProduct product) throws AvroRemoteException { + return this.fileManager.isTransferComplete(AvroTypeFactory.getProduct(product)); + } + + @Override + public AvroProductPage pagedQuery(AvroQuery query, AvroProductType type, int pageNum) throws AvroRemoteException{ + try { + return AvroTypeFactory.getAvroProductPage(this.fileManager.pagedQuery( + AvroTypeFactory.getQuery(query), + AvroTypeFactory.getProductType(type), + pageNum + )); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroProductPage getFirstPage(AvroProductType type) throws AvroRemoteException { + return AvroTypeFactory.getAvroProductPage( + this.fileManager.getFirstPage(AvroTypeFactory.getProductType(type))); + } + + @Override + public AvroProductPage getLastPage(AvroProductType type) throws AvroRemoteException { + return AvroTypeFactory.getAvroProductPage( + this.fileManager.getLastPage(AvroTypeFactory.getProductType(type))); + } + + @Override + public AvroProductPage getNextPage(AvroProductType type, AvroProductPage currPage) throws AvroRemoteException { + return AvroTypeFactory.getAvroProductPage( + this.fileManager.getNextPage( + AvroTypeFactory.getProductType(type), + AvroTypeFactory.getProductPage(currPage))); + } + + @Override + public AvroProductPage getPrevPage(AvroProductType type, AvroProductPage currPage) throws AvroRemoteException { + return AvroTypeFactory.getAvroProductPage( + this.fileManager.getPrevPage( + AvroTypeFactory.getProductType(type), + AvroTypeFactory.getProductPage(currPage))); + } + + @Override + public boolean setProductTransferStatus(AvroProduct product) throws AvroRemoteException { + try { + return this.fileManager.setProductTransferStatus(AvroTypeFactory.getProduct(product)); + } catch (CatalogException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public int getNumProducts(AvroProductType type) throws AvroRemoteException { + try { + return this.fileManager.getNumProducts(AvroTypeFactory.getProductType(type)); + } catch (CatalogException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public List<AvroProduct> getTopNProductsByProductType(int n, AvroProductType type) throws AvroRemoteException { + List<AvroProduct> avroProducts = new ArrayList<AvroProduct>(); + try { + for (Product p : this.fileManager.getTopNProductsByProductType(n, AvroTypeFactory.getProductType(type))){ + avroProducts.add(AvroTypeFactory.getAvroProduct(p)); + } + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + return avroProducts; + } + + @Override + public List<AvroProduct> getTopNProducts(int n) throws AvroRemoteException { + List<AvroProduct> avroProducts = new ArrayList<AvroProduct>(); + try { + for (Product p : this.fileManager.getTopNProducts(n)){ + avroProducts.add(AvroTypeFactory.getAvroProduct(p)); + } + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + return avroProducts; + } + + @Override + public boolean hasProduct(String productName) throws AvroRemoteException { + try { + return this.fileManager.hasProduct(productName); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroMetadata getMetadata(AvroProduct product) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroMetadata(this.fileManager.getMetadata(AvroTypeFactory.getProduct(product))); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroMetadata getReducedMetadata(AvroProduct product, List<String> elements) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroMetadata(this.fileManager.getReducedMetadata(AvroTypeFactory.getProduct(product), elements)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public List<AvroProductType> getProductTypes() throws AvroRemoteException { + List<AvroProductType> avroProductTypes = new ArrayList<AvroProductType>(); + try { + for (ProductType pt : this.fileManager.getProductTypes()){ + avroProductTypes.add(AvroTypeFactory.getAvroProductType(pt)); + } + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } + return avroProductTypes; + } + + @Override + public List<AvroReference> getProductReferences(AvroProduct product) throws AvroRemoteException { + List<AvroReference> avroProductTypes = new ArrayList<AvroReference>(); + try { + for (Reference r : this.fileManager.getProductReferences(AvroTypeFactory.getProduct(product))){ + avroProductTypes.add(AvroTypeFactory.getAvroReference(r)); + } + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + return avroProductTypes; + } + + @Override + public AvroProduct getProductById(String productId) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroProduct(this.fileManager.getProductById(productId)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroProduct getProductByName(String productName) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroProduct(this.fileManager.getProductByName(productName)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public List<AvroProduct> getProductsByProductType(AvroProductType type) throws AvroRemoteException { + List<AvroProduct> avroProducts = new ArrayList<AvroProduct>(); + try { + List<Product> products = this.fileManager.getProductsByProductType(AvroTypeFactory.getProductType(type)); + if (products != null) { + for (Product p : products) { + avroProducts.add(AvroTypeFactory.getAvroProduct(p)); + } + } + return avroProducts; + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public List<AvroElement> getElementsByProductType(AvroProductType type) throws AvroRemoteException { + List<AvroElement> avroElements = new ArrayList<AvroElement>(); + try { + for (Element e : this.fileManager.getElementsByProductType(AvroTypeFactory.getProductType(type))) { + avroElements.add(AvroTypeFactory.getAvroElement(e)); + } + return avroElements; + } catch (ValidationLayerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroElement getElementById(String elementId) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroElement(this.fileManager.getElementById(elementId)); + } catch (ValidationLayerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroElement getElementByName(String elementName) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroElement(this.fileManager.getElementByName(elementName)); + } catch (ValidationLayerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public List<AvroQueryResult> complexQuery(AvroComplexQuery avroComplexQuery) throws AvroRemoteException { + List<AvroQueryResult> avroQueryResults = new ArrayList<AvroQueryResult>(); + try { + for (QueryResult qr : this.fileManager.complexQuery(AvroTypeFactory.getComplexQuery(avroComplexQuery))){ + avroQueryResults.add(AvroTypeFactory.getAvroQueryResult(qr)); + } + return avroQueryResults; + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public List<AvroProduct> query(AvroQuery avroQuery, AvroProductType avroProductType) throws AvroRemoteException { + List<AvroProduct> avroProducts = new ArrayList<AvroProduct>(); + try { + for (Product p : this.fileManager.query(AvroTypeFactory.getQuery(avroQuery), AvroTypeFactory.getProductType(avroProductType))){ + avroProducts.add(AvroTypeFactory.getAvroProduct(p)); + } + return avroProducts; + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroProductType getProductTypeByName(String productTypeName) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroProductType(this.fileManager.getProductTypeByName(productTypeName)); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroProductType getProductTypeById(String productTypeId) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroProductType(this.fileManager.getProductTypeById(productTypeId)); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean updateMetadata(AvroProduct product, AvroMetadata met) throws AvroRemoteException { + try { + return this.fileManager.updateMetadata(AvroTypeFactory.getProduct(product), AvroTypeFactory.getMetadata(met)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public String addProductType(AvroProductType type) throws AvroRemoteException { + try { + return this.fileManager.addProductType(AvroTypeFactory.getProductType(type)); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public String catalogProduct(AvroProduct product) throws AvroRemoteException { + try { + return this.fileManager.catalogProduct(AvroTypeFactory.getProduct(product)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean addMetadata(AvroProduct product, AvroMetadata met) throws AvroRemoteException { + try { + return this.fileManager.addMetadata(AvroTypeFactory.getProduct(product), AvroTypeFactory.getMetadata(met)) != null; + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean addProductReferences(AvroProduct product) throws AvroRemoteException { + try { + return this.fileManager.addProductReferences(AvroTypeFactory.getProduct(product)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public String ingestProduct(AvroProduct p, AvroMetadata m, boolean clientTransfer) throws AvroRemoteException { + try { + return this.fileManager.ingestProduct(AvroTypeFactory.getProduct(p), AvroTypeFactory.getMetadata(m), clientTransfer); + } catch (VersioningException e) { + throw new AvroRemoteException(e.getMessage()); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } catch (DataTransferException e) { + throw new AvroRemoteException(e.getMessage()); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public ByteBuffer retrieveFile(String filePath, int offset, int numBytes) throws AvroRemoteException { + try { + return ByteBuffer.wrap(this.fileManager.retrieveFile(filePath, offset, numBytes)); + } catch (DataTransferException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean transferFile(String filePath, ByteBuffer fileData, int offset, int numBytes) throws AvroRemoteException { + return this.fileManager.transferFile(filePath,fileData.array(),offset,numBytes); + } + + @Override + public boolean moveProduct(AvroProduct p, String newPath) throws AvroRemoteException { + try { + return this.fileManager.moveProduct(AvroTypeFactory.getProduct(p), newPath); + } catch (DataTransferException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean removeFile(String filePath) throws AvroRemoteException { + try { + return this.fileManager.removeFile(filePath); + } catch (DataTransferException e) { + throw new AvroRemoteException(e.getMessage()); + } catch (IOException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean modifyProduct(AvroProduct p) throws AvroRemoteException { + try { + return this.fileManager.modifyProduct(AvroTypeFactory.getProduct(p)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public boolean removeProduct(AvroProduct p) throws AvroRemoteException { + try { + return this.fileManager.removeProduct(AvroTypeFactory.getProduct(p)); + } catch (CatalogException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroMetadata getCatalogValues(AvroMetadata m, AvroProductType productType) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroMetadata(this.fileManager.getCatalogValues( + AvroTypeFactory.getMetadata(m), AvroTypeFactory.getProductType(productType) + )); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroMetadata getOrigValues(AvroMetadata m, AvroProductType productType) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroMetadata(this.fileManager.getOrigValues( + AvroTypeFactory.getMetadata(m), + AvroTypeFactory.getProductType(productType) + )); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + + @Override + public AvroQuery getCatalogQuery(AvroQuery query, AvroProductType productType) throws AvroRemoteException { + try { + return AvroTypeFactory.getAvroQuery(this.fileManager.getCatalogQuery( + AvroTypeFactory.getQuery(query), + AvroTypeFactory.getProductType(productType) + )); + } catch (RepositoryManagerException e) { + throw new AvroRemoteException(e.getMessage()); + } catch (QueryFormulationException e) { + throw new AvroRemoteException(e.getMessage()); + } + } + +}
