Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManager.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManager.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManager.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManager.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,1121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +//OODT imports +import org.apache.oodt.cas.filemgr.catalog.Catalog; +import org.apache.oodt.cas.filemgr.datatransfer.DataTransfer; +import org.apache.oodt.cas.filemgr.datatransfer.TransferStatusTracker; +import org.apache.oodt.cas.filemgr.metadata.ProductMetKeys; +import org.apache.oodt.cas.filemgr.metadata.extractors.FilemgrMetExtractor; +import org.apache.oodt.cas.filemgr.repository.RepositoryManager; +import org.apache.oodt.cas.filemgr.structs.*; +import org.apache.oodt.cas.filemgr.structs.exceptions.*; +import org.apache.oodt.cas.filemgr.structs.query.ComplexQuery; +import org.apache.oodt.cas.filemgr.structs.query.QueryFilter; +import org.apache.oodt.cas.filemgr.structs.query.QueryResult; +import org.apache.oodt.cas.filemgr.structs.query.QueryResultComparator; +import org.apache.oodt.cas.filemgr.structs.query.filter.ObjectTimeEvent; +import org.apache.oodt.cas.filemgr.structs.query.filter.TimeEvent; +import org.apache.oodt.cas.filemgr.structs.type.TypeHandler; +import org.apache.oodt.cas.filemgr.util.GenericFileManagerObjectFactory; +import org.apache.oodt.cas.filemgr.versioning.Versioner; +import org.apache.oodt.cas.filemgr.versioning.VersioningUtils; +import org.apache.oodt.cas.metadata.Metadata; +import org.apache.oodt.cas.metadata.exceptions.MetExtractionException; +import org.apache.oodt.commons.date.DateUtils; + +//JDK imports +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; +import com.google.common.collect.Lists; + +/** + * @author radu + * + * <p>Manages the {@link Catalog}, {@link RepositoryManager} and {@link DataTransfer}. + * Without the rpc logic.</p> + */ +public class FileManager { + + private Catalog catalog = null; + + /* our RepositoryManager */ + private RepositoryManager repositoryManager = null; + + /* our DataTransfer */ + private DataTransfer dataTransfer = null; + + /* our log stream */ + private static final Logger LOG = Logger.getLogger(FileManager.class.getName()); + + + /* our data transfer status tracker */ + private TransferStatusTracker transferStatusTracker = null; + + /* whether or not to expand a product instance into met */ + private boolean expandProductMet; + + public FileManager() throws Exception { + LOG.log(Level.INFO, "File Manager started by " + + System.getProperty("user.name", "unknown")); + } + + public void setCatalog(Catalog catalog) { + this.catalog = catalog; + } + + public boolean refreshConfigAndPolicy() { + boolean status = false; + + try { + this.loadConfiguration(); + status = true; + } catch (Exception e) { + e.printStackTrace(); + LOG + .log( + Level.SEVERE, + "Unable to refresh configuration for file manager " + + "server: server may be in inoperable state: Message: " + + e.getMessage()); + } + + return status; + } + + public boolean transferringProduct(Product product) { + transferStatusTracker.transferringProduct(product); + return true; + } + + public FileTransferStatus getCurrentFileTransfer() { + return transferStatusTracker + .getCurrentFileTransfer(); + } + + public List<FileTransferStatus> getCurrentFileTransfers() { + return transferStatusTracker.getCurrentFileTransfers(); + } + + public double getProductPctTransferred(Product product) { + + double pct = transferStatusTracker.getPctTransferred(product); + return pct; + } + + public double getRefPctTransferred(Reference reference) { + double pct = 0.0; + + try { + pct = transferStatusTracker.getPctTransferred(reference); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.WARNING, + "Exception getting transfer percentage for ref: [" + + reference.getOrigReference() + "]: Message: " + + e.getMessage()); + } + return pct; + } + + public boolean removeProductTransferStatus(Product product) { + transferStatusTracker.removeProductTransferStatus(product); + return true; + } + + public boolean isTransferComplete(Product product) { + return transferStatusTracker.isTransferComplete(product); + } + + public ProductPage pagedQuery( + Query query, + ProductType type, + int pageNum) throws CatalogException { + + ProductPage prodPage = null; + + try { + prodPage = catalog.pagedQuery(this.getCatalogQuery(query, type), type, pageNum); + + if (prodPage == null) { + prodPage = ProductPage.blankPage(); + } else { + // it is possible here that the underlying catalog did not + // set the ProductType + // to obey the contract of the File Manager, we need to make + // sure its set here + setProductType(prodPage.getPageProducts()); + } + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.WARNING, + "Catalog exception performing paged query for product type: [" + + type.getProductTypeId() + "] query: [" + query + + "]: Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + + return prodPage; + } + + public ProductPage getFirstPage( + ProductType type) { + ProductPage page = catalog.getFirstPage(type); + try { + setProductType(page.getPageProducts()); + } catch (Exception e) { + LOG.log(Level.WARNING, + "Unable to set product types for product page list: [" + + page + "]"); + } + return page; + } + + public ProductPage getLastPage( + ProductType type ) { + + ProductPage page = catalog.getLastProductPage(type); + try { + setProductType(page.getPageProducts()); + } catch (Exception e) { + LOG.log(Level.WARNING, + "Unable to set product types for product page list: [" + + page + "]"); + } + return page; + } + + public ProductPage getNextPage( + ProductType type , + ProductPage currPage) { + + ProductPage page = catalog.getNextPage(type, currPage); + try { + setProductType(page.getPageProducts()); + } catch (Exception e) { + LOG.log(Level.WARNING, + "Unable to set product types for product page list: [" + + page + "]"); + } + return page; + } + + public ProductPage getPrevPage( + ProductType type, + ProductPage currPage) { + ProductPage page = catalog.getPrevPage(type, currPage); + try { + setProductType(page.getPageProducts()); + } catch (Exception e) { + LOG.log(Level.WARNING, + "Unable to set product types for product page list: [" + + page + "]"); + } + return page; + } + + public String addProductType(ProductType productType) + throws RepositoryManagerException { + repositoryManager.addProductType(productType); + return productType.getProductTypeId(); + + } + + public synchronized boolean setProductTransferStatus( + Product product) + throws CatalogException { + catalog.setProductTransferStatus(product); + return true; + } + + public int getNumProducts(ProductType type) + throws CatalogException { + int numProducts = -1; + + try { + numProducts = catalog.getNumProducts(type); + } catch (CatalogException e) { + e.printStackTrace(); + LOG.log(Level.WARNING, + "Exception when getting num products: Message: " + + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + + return numProducts; + } + + public List<Product> getTopNProducts(int n) + throws CatalogException { + List<Product> topNProducts = null; + + try { + topNProducts = catalog.getTopNProducts(n); + return topNProducts; + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.WARNING, + "Exception when getting topN products: Message: " + + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + } + + public List<Product> getTopNProductsByProductType(int n, + ProductType type) + throws CatalogException { + List<Product> topNProducts = null; + try { + topNProducts = catalog.getTopNProducts(n, type); + return topNProducts; + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.WARNING, + "Exception when getting topN products by product type: [" + + type.getProductTypeId() + "]: Message: " + + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + + } + + public boolean hasProduct(String productName) throws CatalogException { + Product p = catalog.getProductByName(productName); + return p != null + && p.getTransferStatus().equals(Product.STATUS_RECEIVED); + } + + public List<ProductType> getProductTypes() + throws RepositoryManagerException { + try { + return repositoryManager.getProductTypes(); + } catch (RepositoryManagerException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Unable to obtain product types from repository manager: Message: " + + e.getMessage()); + throw new RepositoryManagerException(e.getMessage()); + } + } + + public List<Reference> getProductReferences( + Product product) + throws CatalogException { + try { + return catalog.getProductReferences(product); + } catch (CatalogException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "Unable to obtain references for product: [" + + product.getProductName() + "]: Message: " + + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + + } + + public Product getProductById(String productId) + throws CatalogException { + Product product = null; + + try { + product = catalog.getProductById(productId); + // it is possible here that the underlying catalog did not + // set the ProductType + // to obey the contract of the File Manager, we need to make + // sure its set here + product.setProductType(this.repositoryManager + .getProductTypeById(product.getProductType() + .getProductTypeId())); + return product; + } catch (CatalogException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "Unable to obtain product by id: [" + + productId + "]: Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } catch (RepositoryManagerException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "Unable to obtain product type by id: [" + + product.getProductType().getProductTypeId() + + "]: Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + + } + + public Product getProductByName(String productName) + throws CatalogException { + Product product = null; + try { + product = catalog.getProductByName(productName); + // it is possible here that the underlying catalog did not + // set the ProductType + // to obey the contract of the File Manager, we need to make + // sure its set here + + product.setProductType(this.repositoryManager + .getProductTypeById(product.getProductType() + .getProductTypeId())); + return product; + } catch (CatalogException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "Unable to obtain product by name: [" + + productName + "]: Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } catch (RepositoryManagerException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "Unable to obtain product type by id: [" + + product.getProductType().getProductTypeId() + + "]: Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + } + + public List<Product> getProductsByProductType( + ProductType type) + throws CatalogException { + try { + + return catalog.getProductsByProductType(type); + + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Exception obtaining products by product type for type: [" + + type.getName() + "]: Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + } + + public List<Element> getElementsByProductType( + ProductType type) + throws ValidationLayerException { + try { + return catalog.getValidationLayer().getElements(type); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Exception obtaining elements for product type: [" + + type.getName() + "]: Message: " + e.getMessage()); + throw new ValidationLayerException(e.getMessage()); + } + } + + public Element getElementById(String elementId) + throws ValidationLayerException { + try { + return catalog.getValidationLayer().getElementById(elementId); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "exception retrieving element by id: [" + + elementId + "]: Message: " + e.getMessage()); + throw new ValidationLayerException(e.getMessage()); + } + } + + public Element getElementByName(String elementName) + throws ValidationLayerException { + try { + return catalog.getValidationLayer() + .getElementByName(elementName); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "exception retrieving element by name: [" + + elementName + "]: Message: " + e.getMessage()); + throw new ValidationLayerException(e.getMessage()); + } + } + + public List<QueryResult> complexQuery( + ComplexQuery complexQuery) throws CatalogException { + try { + + // get ProductTypes + List<ProductType> productTypes = null; + if (complexQuery.getReducedProductTypeNames() == null) { + productTypes = this.repositoryManager.getProductTypes(); + } else { + productTypes = new Vector<ProductType>(); + for (String productTypeName : complexQuery + .getReducedProductTypeNames()) + productTypes.add(this.repositoryManager + .getProductTypeByName(productTypeName)); + } + + // get Metadata + List<QueryResult> queryResults = new LinkedList<QueryResult>(); + for (ProductType productType : productTypes) { + List<String> productIds = catalog.query(this.getCatalogQuery( + complexQuery, productType), productType); + for (String productId : productIds) { + Product product = catalog.getProductById(productId); + product.setProductType(productType); + QueryResult qr = new QueryResult(product, this + .getReducedMetadata(product, complexQuery + .getReducedMetadata())); + qr.setToStringFormat(complexQuery + .getToStringResultFormat()); + queryResults.add(qr); + } + } + + LOG.log(Level.INFO, "Query returned " + queryResults.size() + + " results"); + + // filter query results + if (complexQuery.getQueryFilter() != null) { + queryResults = applyFilterToResults(queryResults, complexQuery + .getQueryFilter()); + LOG.log(Level.INFO, "Filter returned " + queryResults.size() + + " results"); + } + + // sort query results + if (complexQuery.getSortByMetKey() != null) + queryResults = sortQueryResultList(queryResults, complexQuery + .getSortByMetKey()); + + return queryResults; + } catch (Exception e) { + e.printStackTrace(); + throw new CatalogException("Failed to perform complex query : " + + e.getMessage()); + } + } + + public ProductType getProductTypeByName(String productTypeName) + throws RepositoryManagerException { + + ProductType pt = repositoryManager.getProductTypeByName(productTypeName); + return pt; + } + public ProductType getProductTypeById(String productTypeId) + throws RepositoryManagerException { + try { + return repositoryManager.getProductTypeById(productTypeId); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Exception obtaining product type by id for product type: [" + + productTypeId + "]: Message: " + e.getMessage()); + throw new RepositoryManagerException(e.getMessage()); + } + } + + public synchronized boolean updateMetadata(Product product, + Metadata met) throws CatalogException{ + Metadata oldMetadata = catalog.getMetadata(product); + catalog.removeMetadata(oldMetadata, product); + catalog.addMetadata(met, product); + return true; + } + + public String ingestProduct(Product p, + Metadata m, boolean clientTransfer) + throws VersioningException, RepositoryManagerException, + DataTransferException, CatalogException { + try { + // first, create the product + p.setTransferStatus(Product.STATUS_TRANSFER); + catalogProduct(p); + + // now add the metadata + Metadata expandedMetdata = addMetadata(p, m); + + // version the product + if (!clientTransfer || (clientTransfer + && Boolean.getBoolean("org.apache.oodt.cas.filemgr.serverside.versioning"))) { + Versioner versioner = null; + try { + versioner = GenericFileManagerObjectFactory + .getVersionerFromClassName(p.getProductType().getVersioner()); + versioner.createDataStoreReferences(p, expandedMetdata); + } catch (Exception e) { + LOG.log(Level.SEVERE, + "ingestProduct: VersioningException when versioning Product: " + + p.getProductName() + " with Versioner " + + p.getProductType().getVersioner() + ": Message: " + + e.getMessage()); + throw new VersioningException(e); + } + + // add the newly versioned references to the data store + addProductReferences(p); + } + + if (!clientTransfer) { + LOG.log(Level.FINEST, + "File Manager: ingest: no client transfer enabled, " + + "server transfering product: [" + p.getProductName() + "]"); + + // now transfer the product + try { + dataTransfer.transferProduct(p); + // now update the product's transfer status in the data store + p.setTransferStatus(Product.STATUS_RECEIVED); + + try { + catalog.setProductTransferStatus(p); + } catch (CatalogException e) { + LOG.log(Level.SEVERE, "ingestProduct: CatalogException " + + "when updating product transfer status for Product: " + + p.getProductName() + " Message: " + e.getMessage()); + throw e; + } + } catch (Exception e) { + LOG.log(Level.SEVERE, + "ingestProduct: DataTransferException when transfering Product: " + + p.getProductName() + ": Message: " + e.getMessage()); + throw new DataTransferException(e); + } + } + + + // that's it! + return p.getProductId(); + + } catch (Exception e) { + e.printStackTrace(); + throw new CatalogException("Error ingesting product [" + p + "] : " + + e.getMessage()); + } + + } + + public byte[] retrieveFile(String filePath, int offset, int numBytes) + throws DataTransferException { + FileInputStream is = null; + try { + byte[] fileData = new byte[numBytes]; + (is = new FileInputStream(filePath)).skip(offset); + int bytesRead = is.read(fileData); + if (bytesRead != -1) { + byte[] fileDataTruncated = new byte[bytesRead]; + System.arraycopy(fileData, 0, fileDataTruncated, 0, bytesRead); + return fileDataTruncated; + } else { + return new byte[0]; + } + } catch (Exception e) { + LOG.log(Level.SEVERE, "Failed to read '" + numBytes + + "' bytes from file '" + filePath + "' at index '" + offset + + "' : " + e.getMessage(), e); + throw new DataTransferException("Failed to read '" + numBytes + + "' bytes from file '" + filePath + "' at index '" + offset + + "' : " + e.getMessage(), e); + } finally { + try { is.close(); } catch (Exception e) {} + } + } + + public boolean transferFile(String filePath, byte[] fileData, int offset, + int numBytes) { + File outFile = new File(filePath); + boolean success = true; + + FileOutputStream fOut = null; + + if (outFile.exists()) { + try { + fOut = new FileOutputStream(outFile, true); + } catch (FileNotFoundException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "FileNotFoundException when trying to use RandomAccess file on " + + filePath + ": Message: " + e.getMessage()); + success = false; + } + } else { + // create the output directory + String outFileDirPath = outFile.getAbsolutePath().substring(0, + outFile.getAbsolutePath().lastIndexOf("/")); + LOG.log(Level.INFO, "Outfile directory: " + outFileDirPath); + File outFileDir = new File(outFileDirPath); + outFileDir.mkdirs(); + + try { + fOut = new FileOutputStream(outFile, false); + } catch (FileNotFoundException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "FileNotFoundException when trying to use RandomAccess file on " + + filePath + ": Message: " + e.getMessage()); + success = false; + } + } + + if (success) { + try { + fOut.write(fileData, (int) offset, (int) numBytes); + } catch (IOException e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, "IOException when trying to write file " + + filePath + ": Message: " + e.getMessage()); + success = false; + } finally { + if (fOut != null) { + try { + fOut.close(); + } catch (Exception ignore) { + } + + fOut = null; + } + } + } + + outFile = null; + return success; + } + + public boolean moveProduct(Product p, String newPath) + throws DataTransferException { + + // first thing we care about is if the product is flat or heirarchical + if (p.getProductStructure().equals(Product.STRUCTURE_FLAT)) { + // we just need to get its first reference + if (p.getProductReferences() == null + || (p.getProductReferences() != null && p + .getProductReferences().size() != 1)) { + throw new DataTransferException( + "Flat products must have a single reference: cannot move"); + } + + // okay, it's fine to move it + // first, we need to update the data store ref + Reference r = (Reference) p.getProductReferences().get(0); + if (r.getDataStoreReference().equals( + new File(newPath).toURI().toString())) { + throw new DataTransferException("cannot move product: [" + + p.getProductName() + "] to same location: [" + + r.getDataStoreReference() + "]"); + } + + // create a copy of the current data store path: we'll need it to + // do the data transfer + Reference copyRef = new Reference(r); + + // update the copyRef to have the data store ref as the orig ref + // the the newLoc as the new ref + copyRef.setOrigReference(r.getDataStoreReference()); + copyRef.setDataStoreReference(new File(newPath).toURI().toString()); + + p.getProductReferences().clear(); + p.getProductReferences().add(copyRef); + + // now transfer it + try { + this.dataTransfer.transferProduct(p); + } catch (IOException e) { + throw new DataTransferException(e.getMessage()); + } + + // now delete the original copy + try { + if (!new File(new URI(copyRef.getOrigReference())).delete()) { + LOG.log(Level.WARNING, "Deletion of original file: [" + + r.getDataStoreReference() + + "] on product move returned false"); + } + } catch (URISyntaxException e) { + throw new DataTransferException( + "URI Syntax exception trying to remove original product ref: Message: " + + e.getMessage()); + } + + // now save the updated reference + try { + this.catalog.modifyProduct(p); + return true; + } catch (CatalogException e) { + throw new DataTransferException(e.getMessage()); + } + } else + throw new UnsupportedOperationException( + "Moving of heirarhical and stream products not supported yet"); + } + + public boolean removeFile(String filePath) throws DataTransferException, IOException { + // TODO(bfoster): Clean this up so that it deletes by product not file. + Product product = new Product(); + Reference r = new Reference(); + r.setDataStoreReference(filePath); + product.setProductReferences(Lists.newArrayList(r)); + dataTransfer.deleteProduct(product); + return true; + } + + public boolean modifyProduct(Product p) throws CatalogException { + try { + catalog.modifyProduct(p); + } catch (CatalogException e) { + LOG.log(Level.WARNING, "Exception modifying product: [" + + p.getProductId() + "]: Message: " + e.getMessage(), e); + throw e; + } + + return true; + } + + public boolean removeProduct(Product p) throws CatalogException { + try { + catalog.removeProduct(p); + } catch (CatalogException e) { + LOG.log(Level.WARNING, "Exception modifying product: [" + + p.getProductId() + "]: Message: " + e.getMessage(), e); + throw e; + } + + return true; + } + + public synchronized String catalogProduct(Product p) + throws CatalogException { + try { + catalog.addProduct(p); + } catch (CatalogException e) { + LOG.log(Level.SEVERE, + "ingestProduct: CatalogException when adding Product: " + + p.getProductName() + " to Catalog: Message: " + + e.getMessage()); + throw e; + } + + return p.getProductId(); + } + + public synchronized Metadata addMetadata(Product p, Metadata m) + throws CatalogException { + + //apply handlers + try { + m = this.getCatalogValues(m, p.getProductType()); + } catch (Exception e) { + LOG.log(Level.SEVERE, "Failed to get handlers for product '" + p + + "' : " + e.getMessage()); + } + + // first do server side metadata extraction + Metadata metadata = runExtractors(p, m); + + try { + catalog.addMetadata(metadata, p); + } catch (CatalogException e) { + LOG.log(Level.SEVERE, + "ingestProduct: CatalogException when adding metadata " + + metadata + " for product: " + p.getProductName() + + ": Message: " + e.getMessage()); + throw e; + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "ingestProduct: General Exception when adding metadata " + + metadata + " for product: " + p.getProductName() + + ": Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + + return metadata; + } + + private Metadata runExtractors(Product product, Metadata metadata) { + // make sure that the product type definition is present + try { + product.setProductType(repositoryManager.getProductTypeById(product + .getProductType().getProductTypeId())); + } catch (RepositoryManagerException e) { + LOG.log(Level.SEVERE, "Failed to load ProductType " + product + .getProductType().getProductTypeId(), e); + return null; + } + + Metadata met = new Metadata(); + met.addMetadata(metadata.getHashtable()); + + if (product.getProductType().getExtractors() != null) { + for (ExtractorSpec spec: product.getProductType().getExtractors()) { + FilemgrMetExtractor extractor = GenericFileManagerObjectFactory + .getExtractorFromClassName(spec.getClassName()); + extractor.configure(spec.getConfiguration()); + LOG.log(Level.INFO, "Running Met Extractor: [" + + extractor.getClass().getName() + + "] for product type: [" + + product.getProductType().getName() + "]"); + try { + met = extractor.extractMetadata(product, met); + } catch (MetExtractionException e) { + LOG.log(Level.SEVERE, + "Exception extractor metadata from product: [" + + product.getProductName() + + "]: using extractor: [" + + extractor.getClass().getName() + + "]: Message: " + e.getMessage(), e); + } + } + } + + return met; + } + + public synchronized boolean addProductReferences(Product product) + throws CatalogException { + catalog.addProductReferences(product); + return true; + } + + private void setProductType(List<Product> products) throws Exception { + if (products != null && products.size() > 0) { + for (Iterator<Product> i = products.iterator(); i.hasNext();) { + Product p = i.next(); + try { + p.setProductType(repositoryManager.getProductTypeById(p + .getProductType().getProductTypeId())); + } catch (RepositoryManagerException e) { + throw new Exception(e.getMessage()); + } + } + } + } + + public List<Product> query(Query query, ProductType productType) throws CatalogException { + List<String> productIdList = null; + List<Product> productList = null; + + try { + productIdList = catalog.query(this.getCatalogQuery(query, productType), productType); + + if (productIdList != null && productIdList.size() > 0) { + productList = new Vector<Product>(productIdList.size()); + for (Iterator<String> i = productIdList.iterator(); i.hasNext();) { + String productId = i.next(); + Product product = catalog.getProductById(productId); + // it is possible here that the underlying catalog did not + // set the ProductType + // to obey the contract of the File Manager, we need to make + // sure its set here + product.setProductType(this.repositoryManager + .getProductTypeById(product.getProductType() + .getProductTypeId())); + productList.add(product); + } + return productList; + } else { + return new Vector<Product>(); // null values not supported by XML-RPC + } + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Exception performing query against catalog for product type: [" + + productType.getName() + "] Message: " + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + } + + public Metadata getReducedMetadata(Product product, List<String> elements) throws CatalogException { + try { + Metadata m = null; + if (elements != null && elements.size() > 0) { + m = catalog.getReducedMetadata(product, elements); + }else { + m = this.getMetadata(product); + } + if(this.expandProductMet) m = this.buildProductMetadata(product, m); + return this.getOrigValues(m, product.getProductType()); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Exception obtaining metadata from catalog for product: [" + + product.getProductId() + "]: Message: " + + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + } + + public Metadata getMetadata(Product product) throws CatalogException { + try { + Metadata m = catalog.getMetadata(product); + if(this.expandProductMet) m = this.buildProductMetadata(product, m); + return this.getOrigValues(m, product.getProductType()); + } catch (Exception e) { + e.printStackTrace(); + LOG.log(Level.SEVERE, + "Exception obtaining metadata from catalog for product: [" + + product.getProductId() + "]: Message: " + + e.getMessage()); + throw new CatalogException(e.getMessage()); + } + } + + public Metadata getOrigValues(Metadata metadata, ProductType productType) + throws RepositoryManagerException { + List<TypeHandler> handlers = this.repositoryManager.getProductTypeById( + productType.getProductTypeId()).getHandlers(); + if (handlers != null) { + for (Iterator<TypeHandler> iter = handlers.iterator(); iter + .hasNext();) + iter.next().postGetMetadataHandle(metadata); + } + return metadata; + } + + public Metadata getCatalogValues(Metadata metadata, ProductType productType) + throws RepositoryManagerException { + List<TypeHandler> handlers = this.repositoryManager.getProductTypeById( + productType.getProductTypeId()).getHandlers(); + if (handlers != null) { + for (Iterator<TypeHandler> iter = handlers.iterator(); iter + .hasNext();) + iter.next().preAddMetadataHandle(metadata); + } + return metadata; + } + + public Query getCatalogQuery(Query query, ProductType productType) + throws RepositoryManagerException, QueryFormulationException { + List<TypeHandler> handlers = this.repositoryManager.getProductTypeById( + productType.getProductTypeId()).getHandlers(); + if (handlers != null) { + for (Iterator<TypeHandler> iter = handlers.iterator(); iter + .hasNext();) + iter.next().preQueryHandle(query); + } + return query; + } + + @SuppressWarnings("unchecked") + private List<QueryResult> applyFilterToResults( + List<QueryResult> queryResults, QueryFilter queryFilter) + throws Exception { + List<TimeEvent> events = new LinkedList<TimeEvent>(); + for (QueryResult queryResult : queryResults) { + Metadata m = new Metadata(); + m.addMetadata(queryFilter.getPriorityMetKey(), queryResult + .getMetadata().getMetadata(queryFilter.getPriorityMetKey())); + events.add(new ObjectTimeEvent<QueryResult>( + DateUtils.getTimeInMillis(DateUtils.toCalendar(queryResult + .getMetadata().getMetadata(queryFilter.getStartDateTimeMetKey()), + DateUtils.FormatType.UTC_FORMAT), DateUtils.julianEpoch), + DateUtils.getTimeInMillis(DateUtils.toCalendar(queryResult.getMetadata() + .getMetadata(queryFilter.getEndDateTimeMetKey()), + DateUtils.FormatType.UTC_FORMAT), + DateUtils.julianEpoch), queryFilter.getConverter() + .convertToPriority(this.getCatalogValues(m, + queryResult.getProduct().getProductType()) + .getMetadata(queryFilter.getPriorityMetKey())), + queryResult)); + } + events = queryFilter.getFilterAlgor().filterEvents(events); + List<QueryResult> filteredQueryResults = new LinkedList<QueryResult>(); + for (TimeEvent event : events) + filteredQueryResults.add(((ObjectTimeEvent<QueryResult>) event) + .getTimeObject()); + + return filteredQueryResults; + } + + private List<QueryResult> sortQueryResultList(List<QueryResult> queryResults, + String sortByMetKey) { + QueryResult[] resultsArray = queryResults + .toArray(new QueryResult[queryResults.size()]); + QueryResultComparator qrComparator = new QueryResultComparator(); + qrComparator.setSortByMetKey(sortByMetKey); + Arrays.sort(resultsArray, qrComparator); + return Arrays.asList(resultsArray); + } + + private Metadata buildProductMetadata(Product product, Metadata metadata) + throws CatalogException { + Metadata pMet = new Metadata(); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_ID, product.getProductId() != null ? + product.getProductId():"unknown"); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_NAME, product.getProductName() != null ? + product.getProductName():"unknown"); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_STRUCTURE, product + .getProductStructure() != null ? product.getProductStructure():"unknown"); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_TRANSFER_STATUS, product + .getTransferStatus() != null ? product.getTransferStatus():"unknown"); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_ROOT_REFERENCE, product.getRootRef() != null ? + VersioningUtils + .getAbsolutePathFromUri(product.getRootRef().getDataStoreReference()):"unknown"); + + List<Reference> refs = product.getProductReferences(); + + if (refs == null || (refs != null && refs.size() == 0)) { + refs = this.catalog.getProductReferences(product); + } + + for (Reference r : refs) { + pMet.replaceMetadata(ProductMetKeys.PRODUCT_ORIG_REFS, r.getOrigReference() != null + ? VersioningUtils + .getAbsolutePathFromUri(r.getOrigReference()):"unknown"); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS, + r.getDataStoreReference() != null ? + VersioningUtils.getAbsolutePathFromUri(r.getDataStoreReference()):"unknown"); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_FILE_SIZES, String.valueOf(r + .getFileSize())); + pMet.replaceMetadata(ProductMetKeys.PRODUCT_MIME_TYPES, + r.getMimeType() != null ? r.getMimeType().getName() : "unknown"); + } + + return pMet; + } + + public void loadConfiguration() throws FileNotFoundException, IOException { + // set up the configuration, if there is any + if (System.getProperty("org.apache.oodt.cas.filemgr.properties") != null) { + String configFile = System + .getProperty("org.apache.oodt.cas.filemgr.properties"); + LOG.log(Level.INFO, + "Loading File Manager Configuration Properties from: [" + configFile + + "]"); + System.getProperties().load(new FileInputStream(new File(configFile))); + } + + String metaFactory = null, dataFactory = null; + + metaFactory = System.getProperty("filemgr.catalog.factory", + "org.apache.oodt.cas.filemgr.catalog.DataSourceCatalogFactory"); + dataFactory = System + .getProperty("filemgr.repository.factory", + "org.apache.oodt.cas.filemgr.repository.DataSourceRepositoryManagerFactory"); + + catalog = GenericFileManagerObjectFactory + .getCatalogServiceFromFactory(metaFactory); + repositoryManager = GenericFileManagerObjectFactory + .getRepositoryManagerServiceFromFactory(dataFactory); + + + transferStatusTracker = new TransferStatusTracker(catalog); + + // got to start the server before setting up the transfer client since + // it + // checks for a live server + + expandProductMet = Boolean + .getBoolean("org.apache.oodt.cas.filemgr.metadata.expandProduct"); + } + + public void setDataTransfer(DataTransfer dataTransfer){ + this.dataTransfer = dataTransfer; + } + +}
Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClient.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClient.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClient.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClient.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +// OODT imports +import org.apache.oodt.cas.filemgr.datatransfer.DataTransfer; +import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; +import org.apache.oodt.cas.filemgr.structs.exceptions.RepositoryManagerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.ValidationLayerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException; +import org.apache.oodt.cas.filemgr.structs.Element; +import org.apache.oodt.cas.filemgr.structs.FileTransferStatus; +import org.apache.oodt.cas.filemgr.structs.ProductPage; +import org.apache.oodt.cas.filemgr.structs.ProductType; +import org.apache.oodt.cas.filemgr.structs.Product; +import org.apache.oodt.cas.filemgr.structs.Query; +import org.apache.oodt.cas.filemgr.structs.Reference; +import org.apache.oodt.cas.filemgr.structs.query.ComplexQuery; +import org.apache.oodt.cas.filemgr.structs.query.QueryResult; +import org.apache.oodt.cas.metadata.Metadata; + +//JDK imports +import java.net.URL; +import java.util.List; + + +/** + * @author radu + * + * <p>Interface of client for FileManager RPC logic. All methods that are used for + * RPC transfer.</p> + */ +public interface FileManagerClient { + + public boolean refreshConfigAndPolicy(); + + public boolean isAlive(); + + public boolean transferringProduct(Product product)throws DataTransferException; + + public boolean removeProductTransferStatus(Product product)throws DataTransferException; + + public boolean isTransferComplete(Product product)throws DataTransferException; + + public boolean moveProduct(Product product, String newPath)throws DataTransferException; + + public boolean modifyProduct(Product product) throws CatalogException; + + public boolean removeProduct(Product product) throws CatalogException; + + @SuppressWarnings("unchecked") + public FileTransferStatus getCurrentFileTransfer()throws DataTransferException; + + @SuppressWarnings("unchecked") + public List<FileTransferStatus> getCurrentFileTransfers()throws DataTransferException; + + public double getProductPctTransferred(Product product)throws DataTransferException; + + public double getRefPctTransferred(Reference reference)throws DataTransferException; + + @SuppressWarnings("unchecked") + public ProductPage pagedQuery(Query query, ProductType type, int pageNum)throws CatalogException; + + @SuppressWarnings("unchecked") + public ProductPage getFirstPage(ProductType type) throws CatalogException; + + @SuppressWarnings("unchecked") + public ProductPage getLastPage(ProductType type) throws CatalogException; + + @SuppressWarnings("unchecked") + public ProductPage getNextPage(ProductType type, ProductPage currPage)throws CatalogException; + + @SuppressWarnings("unchecked") + public ProductPage getPrevPage(ProductType type, ProductPage currPage)throws CatalogException; + + public String addProductType(ProductType type)throws RepositoryManagerException; + + public boolean hasProduct(String productName) throws CatalogException; + + public int getNumProducts(ProductType type) throws CatalogException; + + @SuppressWarnings("unchecked") + public List<Product> getTopNProducts(int n) throws CatalogException; + + @SuppressWarnings("unchecked") + public List<Product> getTopNProducts(int n, ProductType type)throws CatalogException; + + public void setProductTransferStatus(Product product)throws CatalogException; + + public void addProductReferences(Product product) throws CatalogException; + + public void addMetadata(Product product, Metadata metadata)throws CatalogException; + + public boolean updateMetadata(Product product, Metadata met)throws CatalogException; + + public String catalogProduct(Product product) throws CatalogException; + + @SuppressWarnings("unchecked") + public Metadata getMetadata(Product product) throws CatalogException; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Metadata getReducedMetadata(Product product, List<?> elements)throws CatalogException; + + public boolean removeFile(String filePath) throws DataTransferException; + + public byte[] retrieveFile(String filePath, int offset, int numBytes)throws DataTransferException; + + public void transferFile(String filePath, byte[] fileData, int offset,int numBytes) throws DataTransferException; + + @SuppressWarnings("unchecked") + public List<Product> getProductsByProductType(ProductType type)throws CatalogException; + + @SuppressWarnings("unchecked") + public List<Element> getElementsByProductType(ProductType type)throws ValidationLayerException; + + @SuppressWarnings("unchecked") + public Element getElementById(String elementId)throws ValidationLayerException; + + @SuppressWarnings("unchecked") + public Element getElementByName(String elementName)throws ValidationLayerException; + + public List<QueryResult> complexQuery(ComplexQuery complexQuery)throws CatalogException; + + @SuppressWarnings("unchecked") + public List<Product> query(Query query, ProductType type)throws CatalogException; + + @SuppressWarnings("unchecked") + public ProductType getProductTypeByName(String productTypeName)throws RepositoryManagerException; + + @SuppressWarnings("unchecked") + public ProductType getProductTypeById(String productTypeId)throws RepositoryManagerException; + + @SuppressWarnings("unchecked") + public List<ProductType> getProductTypes()throws RepositoryManagerException; + + @SuppressWarnings("unchecked") + public List<Reference> getProductReferences(Product product)throws CatalogException; + + @SuppressWarnings("unchecked") + public Product getProductById(String productId) throws CatalogException; + + @SuppressWarnings("unchecked") + public Product getProductByName(String productName) throws CatalogException; + + public String ingestProduct(Product product, Metadata metadata,boolean clientTransfer) throws Exception; + + @SuppressWarnings("unchecked") + public Metadata getCatalogValues(Metadata metadata, ProductType productType)throws Exception; + + @SuppressWarnings("unchecked") + public Metadata getOrigValues(Metadata metadata, ProductType productType)throws Exception; + + @SuppressWarnings("unchecked") + public Query getCatalogQuery(Query query, ProductType productType)throws Exception; + + public URL getFileManagerUrl(); + + public void setFileManagerUrl(URL fileManagerUrl); + + public DataTransfer getDataTransfer(); + + public void setDataTransfer(DataTransfer dataTransfer); + +} \ No newline at end of file Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClientMain.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClientMain.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClientMain.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerClientMain.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +import org.apache.oodt.cas.cli.CmdLineUtility; + +/** + * @author radu + * + * <p>Runs the {@link FileManagerClient} interface</p> + * + */ +public class FileManagerClientMain { + + public static void main(String[] args) { + CmdLineUtility cmdLineUtility = new CmdLineUtility(); + cmdLineUtility.run(args); + } + +} Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServer.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServer.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServer.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServer.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +//OODT imports +import org.apache.oodt.cas.filemgr.catalog.Catalog; + +/** + * @author radu + * + * <p>Interface of server for FileManager RPC logic.</p> + * + */ +public interface FileManagerServer { + + /** + * + * <p>Preparing and starting up the rpc server.</p> + * + * @return + * @throws Exception + * If any error occurs while starting up the server. + */ + public boolean startUp() throws Exception; + + /** + * + * <p>Shutting down the server.</p> + * + * + * @return + */ + public boolean shutdown(); + + /** + * + * <p>Verifying if the server is alive.</p> + * + * @return + */ + public boolean isAlive(); + + /** + * + * <p>Passing the {@link Catalog} to {@link FileManager} implementation.</p> + * + * @param catalog + * {@link Catalog} for {@link FileManager} + */ + public void setCatalog(Catalog catalog); +} Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServerMain.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServerMain.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServerMain.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/FileManagerServerMain.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +import org.apache.oodt.cas.filemgr.util.RpcCommunicationFactory; +/** + * @author radu + * + * <p>Runs the {@link FileManagerServer} interface</p> + * + */ +public class FileManagerServerMain { + + public static void main(String[] args) throws Exception { + + int portNum = -1; + + String usage = "FileManager --portNum <port number for rpc service>\n"; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--portNum")) { + portNum = Integer.parseInt(args[++i]); + } + } + + if (portNum == -1) { + System.err.println(usage); + System.exit(1); + } + + @SuppressWarnings("unused") + + FileManagerServer manager = RpcCommunicationFactory.createServer(portNum); + manager.startUp(); + + + for (;;) + try { + Thread.currentThread().join(); + } catch (InterruptedException ignore) { + } + } +} Modified: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerClient.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerClient.java?rev=1701724&r1=1701723&r2=1701724&view=diff ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerClient.java (original) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerClient.java Tue Sep 8 04:25:17 2015 @@ -17,30 +17,17 @@ package org.apache.oodt.cas.filemgr.system; -//APACHE imports import org.apache.xmlrpc.CommonsXmlRpcTransport; import org.apache.xmlrpc.XmlRpcClient; import org.apache.xmlrpc.XmlRpcClientException; import org.apache.xmlrpc.XmlRpcException; import org.apache.xmlrpc.XmlRpcTransport; import org.apache.xmlrpc.XmlRpcTransportFactory; -//JDK imports -import java.net.URL; -import java.util.Hashtable; -import java.util.List; -import java.util.Vector; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.File; -//OODT imports import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethod; import org.apache.commons.httpclient.HttpMethodRetryHandler; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.oodt.cas.metadata.Metadata; -import org.apache.oodt.cas.cli.CmdLineUtility; import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; import org.apache.oodt.cas.filemgr.structs.exceptions.RepositoryManagerException; import org.apache.oodt.cas.filemgr.structs.exceptions.ValidationLayerException; @@ -60,7 +47,13 @@ import org.apache.oodt.cas.filemgr.struc import org.apache.oodt.cas.filemgr.structs.exceptions.ConnectionException; import org.apache.oodt.cas.filemgr.structs.query.ComplexQuery; import org.apache.oodt.cas.filemgr.structs.query.QueryResult; - +import java.net.URL; +import java.util.Hashtable; +import java.util.List; +import java.util.Vector; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.io.IOException; /** * @author mattmann (Chris Mattmann) * @author bfoster (Brian Foster) @@ -71,7 +64,7 @@ import org.apache.oodt.cas.filemgr.struc * </p> * */ -public class XmlRpcFileManagerClient { +public class XmlRpcFileManagerClient implements FileManagerClient { /* our xml rpc client */ private XmlRpcClient client = null; @@ -94,7 +87,7 @@ public class XmlRpcFileManagerClient { * <p> * Constructs a new XmlRpcFileManagerClient with the given <code>url</code>. * </p> - * + * * @param url * The url pointer to the xml rpc file manager service. * @param testConnection @@ -102,23 +95,6 @@ public class XmlRpcFileManagerClient { */ public XmlRpcFileManagerClient(final URL url, boolean testConnection) throws ConnectionException { - // set up the configuration, if there is any - if (System.getProperty("org.apache.oodt.cas.filemgr.properties") != null) { - String configFile = System - .getProperty("org.apache.oodt.cas.filemgr.properties"); - LOG.log(Level.INFO, - "Loading File Manager Configuration Properties from: [" - + configFile + "]"); - try { - System.getProperties().load( - new FileInputStream(new File(configFile))); - } catch (Exception e) { - LOG.log(Level.INFO, - "Error loading configuration properties from: [" - + configFile + "]"); - } - - } XmlRpcTransportFactory transportFactory = new XmlRpcTransportFactory() { @@ -654,6 +630,7 @@ public class XmlRpcFileManagerClient { return topNProductList; } + @SuppressWarnings("unchecked") public List<Product> getTopNProducts(int n, ProductType type) throws CatalogException { @@ -967,31 +944,6 @@ public class XmlRpcFileManagerClient { } } - @SuppressWarnings("unchecked") - public Element getElementByName(String elementName, ProductType type) - throws ValidationLayerException { - Vector<Object> argList = new Vector<Object>(); - argList.add(elementName); - argList.add(XmlRpcStructFactory.getXmlRpcProductType(type)); - - Hashtable<String, Object> elementHash = null; - - try { - elementHash = (Hashtable<String, Object>) client.execute( - "filemgr.getElementByName", argList); - } catch (XmlRpcException e) { - throw new ValidationLayerException(e.getMessage()); - } catch (IOException e) { - throw new ValidationLayerException(e.getMessage()); - } - - if (elementHash == null) { - return null; - } else { - return XmlRpcStructFactory.getElementFromXmlRpc(elementHash); - } - } - public List<QueryResult> complexQuery(ComplexQuery complexQuery) throws CatalogException { try { @@ -1344,12 +1296,6 @@ public class XmlRpcFileManagerClient { .getQueryFromXmlRpc((Hashtable<String, Object>) this.client .execute("filemgr.getCatalogQuery", args)); } - - public static void main(String[] args) { - CmdLineUtility cmdLineUtility = new CmdLineUtility(); - cmdLineUtility.run(args); - } - /** * @return Returns the fileManagerUrl. */ Added: oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerServer.java URL: http://svn.apache.org/viewvc/oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerServer.java?rev=1701724&view=auto ============================================================================== --- oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerServer.java (added) +++ oodt/branches/avro_rpc/filemgr/src/main/java/org/apache/oodt/cas/filemgr/system/XmlRpcFileManagerServer.java Tue Sep 8 04:25:17 2015 @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system; + +import org.apache.oodt.cas.filemgr.catalog.Catalog; +import org.apache.oodt.cas.filemgr.datatransfer.DataTransfer; +import org.apache.oodt.cas.filemgr.structs.FileTransferStatus; +import org.apache.oodt.cas.filemgr.structs.ProductPage; +import org.apache.oodt.cas.filemgr.structs.ProductType; +import org.apache.oodt.cas.filemgr.structs.Product; +import org.apache.oodt.cas.filemgr.structs.Query; +import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; +import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException; +import org.apache.oodt.cas.filemgr.structs.exceptions.ValidationLayerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.RepositoryManagerException; +import org.apache.oodt.cas.filemgr.structs.exceptions.QueryFormulationException; +import org.apache.oodt.cas.filemgr.structs.exceptions.VersioningException; +import org.apache.oodt.cas.filemgr.structs.query.ComplexQuery; +import org.apache.oodt.cas.filemgr.util.GenericFileManagerObjectFactory; +import org.apache.oodt.cas.filemgr.util.XmlRpcStructFactory; +import org.apache.oodt.cas.metadata.Metadata; +import org.apache.xmlrpc.WebServer; +import java.io.IOException; +import java.net.URL; +import java.util.Hashtable; +import java.util.List; +import java.util.Vector; + +public class XmlRpcFileManagerServer implements FileManagerServer { + + /* the port to run the XML RPC web server on, default is 1999 */ + protected int port = 1999; + + /* our xml rpc web server */ + private WebServer webServer = null; + + /* file manager tools */ + FileManager fileManager; + + public XmlRpcFileManagerServer(int port){ + this.port = port; + } + + @Override + public boolean isAlive() { + return true; + } + + @Override + public void setCatalog(Catalog catalog) { + this.fileManager.setCatalog(catalog); + } + + + @Override + public boolean startUp() throws Exception { + webServer = new WebServer(this.port); + webServer.addHandler("filemgr", this); + webServer.start(); + this.fileManager = new FileManager(); + this.loadConfiguration(); + return true; + } + + public void loadConfiguration() throws IOException { + fileManager.loadConfiguration(); + + String transferFactory = null; + + transferFactory = System.getProperty("filemgr.datatransfer.factory", + "org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory"); + + DataTransfer dataTransfer = GenericFileManagerObjectFactory + .getDataTransferServiceFromFactory(transferFactory); + + dataTransfer + .setFileManagerUrl(new URL("http://localhost:" + port)); + fileManager.setDataTransfer(dataTransfer); + + } + + public boolean refreshConfigAndPolicy() { + + boolean success = fileManager.refreshConfigAndPolicy(); + try { + String transferFactory = null; + + transferFactory = System.getProperty("filemgr.datatransfer.factory", + "org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory"); + + DataTransfer dataTransfer = GenericFileManagerObjectFactory + .getDataTransferServiceFromFactory(transferFactory); + + dataTransfer + .setFileManagerUrl(new URL("http://localhost:" + port)); + fileManager.setDataTransfer(dataTransfer); + + + fileManager.loadConfiguration(); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + return success; + + } + + public boolean transferringProduct(Hashtable<String, Object> productHash) { + return fileManager.transferringProduct(XmlRpcStructFactory.getProductFromXmlRpc(productHash)); + } + + public Hashtable<String, Object> getCurrentFileTransfer() { + FileTransferStatus status = fileManager.getCurrentFileTransfer(); + if (status == null) { + return new Hashtable<String, Object>(); + } else + return XmlRpcStructFactory.getXmlRpcFileTransferStatus(status); + } + + public Vector<Hashtable<String, Object>> getCurrentFileTransfers() { + List<FileTransferStatus> currentTransfers = fileManager.getCurrentFileTransfers(); + + if (currentTransfers != null && currentTransfers.size() > 0) { + return XmlRpcStructFactory + .getXmlRpcFileTransferStatuses(currentTransfers); + } else + return new Vector<Hashtable<String, Object>>(); + } + + public double getRefPctTransferred(Hashtable<String, Object> refHash) { + return fileManager.getRefPctTransferred(XmlRpcStructFactory + .getReferenceFromXmlRpc(refHash)); + } + + public boolean removeProductTransferStatus(Hashtable<String, Object> productHash) { + return fileManager.removeProductTransferStatus(XmlRpcStructFactory.getProductFromXmlRpc(productHash)); + } + + public boolean isTransferComplete(Hashtable<String, Object> productHash) { + return fileManager.isTransferComplete(XmlRpcStructFactory.getProductFromXmlRpc(productHash)) ; + } + + public Hashtable<String, Object> pagedQuery( + Hashtable<String, Object> queryHash, + Hashtable<String, Object> productTypeHash, + int pageNum) throws CatalogException { + + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + Query query = XmlRpcStructFactory.getQueryFromXmlRpc(queryHash); + + return XmlRpcStructFactory.getXmlRpcProductPage(fileManager.pagedQuery(query, type, pageNum)); + } + + + public Hashtable<String, Object> getFirstPage( + Hashtable<String, Object> productTypeHash) { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + + return XmlRpcStructFactory.getXmlRpcProductPage(fileManager.getFirstPage(type)); + } + + public Hashtable<String, Object> getLastPage( + Hashtable<String, Object> productTypeHash) { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + + return XmlRpcStructFactory.getXmlRpcProductPage(fileManager.getLastPage(type)); + } + + public Hashtable<String, Object> getNextPage( + Hashtable<String, Object> productTypeHash, + Hashtable<String, Object> currentPageHash) { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + ProductPage currPage = XmlRpcStructFactory + .getProductPageFromXmlRpc(currentPageHash); + + return XmlRpcStructFactory.getXmlRpcProductPage(fileManager.getNextPage(type, currPage)); + } + + public double getProductPctTransferred(Hashtable<String, Object> productHash) { + return this.fileManager.getProductPctTransferred(XmlRpcStructFactory.getProductFromXmlRpc(productHash)); + } + + + public Hashtable<String, Object> getPrevPage( + Hashtable<String, Object> productTypeHash, + Hashtable<String, Object> currentPageHash) { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + ProductPage currPage = XmlRpcStructFactory + .getProductPageFromXmlRpc(currentPageHash); + + return XmlRpcStructFactory.getXmlRpcProductPage(fileManager.getPrevPage(type, currPage)); + } + + public String addProductType(Hashtable<String, Object> productTypeHash) + throws RepositoryManagerException { + ProductType productType = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + + return fileManager.addProductType(productType); + + } + + public synchronized boolean setProductTransferStatus( + Hashtable<String, Object> productHash) + throws CatalogException { + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + + return fileManager.setProductTransferStatus(product); + } + + public int getNumProducts(Hashtable<String, Object> productTypeHash) + throws CatalogException { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + + return fileManager.getNumProducts(type); + } + + public Vector<Hashtable<String, Object>> getTopNProducts(int n) + throws CatalogException { + return XmlRpcStructFactory.getXmlRpcProductList(fileManager.getTopNProducts(n)); + } + + + public Vector<Hashtable<String, Object>> getTopNProducts(int n, + Hashtable<String, Object> productTypeHash) + throws CatalogException { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + return XmlRpcStructFactory.getXmlRpcProductList(fileManager.getTopNProductsByProductType(n, type)); + } + + + public boolean hasProduct(String productName) throws CatalogException { + return fileManager.hasProduct(productName); + } + + public Hashtable<String, Object> getMetadata( + Hashtable<String, Object> productHash) throws CatalogException { + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.getMetadata(product).getHashtable(); + } + + public Hashtable<String, Object> getReducedMetadata( + Hashtable<String, Object> productHash, Vector<String> elements) + throws CatalogException { + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.getReducedMetadata(product, elements).getHashtable(); + } + + public Vector<Hashtable<String, Object>> getProductTypes() + throws RepositoryManagerException { + return XmlRpcStructFactory.getXmlRpcProductTypeList(fileManager.getProductTypes()); + } + + public Vector<Hashtable<String, Object>> getProductReferences( + Hashtable<String, Object> productHash) + throws CatalogException { + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return XmlRpcStructFactory.getXmlRpcReferences(fileManager.getProductReferences(product)); + } + + public Hashtable<String, Object> getProductById(String productId) + throws CatalogException { + Product product = fileManager.getProductById(productId); + return XmlRpcStructFactory.getXmlRpcProduct(product); + } + + public Hashtable<String, Object> getProductByName(String productName) + throws CatalogException { + + Product product = fileManager.getProductByName(productName); + return XmlRpcStructFactory.getXmlRpcProduct(product); + } + + public Vector<Hashtable<String, Object>> getProductsByProductType( + Hashtable<String, Object> productTypeHash) + throws CatalogException { + ProductType type = XmlRpcStructFactory.getProductTypeFromXmlRpc(productTypeHash); + return XmlRpcStructFactory.getXmlRpcProductList(fileManager.getProductsByProductType(type)); + } + + public Vector<Hashtable<String, Object>> getElementsByProductType( + Hashtable<String, Object> productTypeHash) throws ValidationLayerException { + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + return XmlRpcStructFactory.getXmlRpcElementList(fileManager.getElementsByProductType(type)); + } + + public Hashtable<String, Object> getElementById(String elementId) + throws ValidationLayerException { + return XmlRpcStructFactory.getXmlRpcElement(fileManager.getElementById(elementId)); + } + + public Hashtable<String, Object> getElementByName(String elementName) + throws ValidationLayerException { + return XmlRpcStructFactory.getXmlRpcElement(fileManager.getElementByName(elementName)); + } + + public Vector<Hashtable<String, Object>> complexQuery( + Hashtable<String, Object> complexQueryHash) throws CatalogException { + ComplexQuery complexQuery = XmlRpcStructFactory + .getComplexQueryFromXmlRpc(complexQueryHash); + return XmlRpcStructFactory.getXmlRpcQueryResults(fileManager.complexQuery(complexQuery)); + } + + public Vector<Hashtable<String, Object>> query( + Hashtable<String, Object> queryHash, + Hashtable<String, Object> productTypeHash) + throws CatalogException { + Query query = XmlRpcStructFactory.getQueryFromXmlRpc(queryHash); + ProductType type = XmlRpcStructFactory + .getProductTypeFromXmlRpc(productTypeHash); + return XmlRpcStructFactory.getXmlRpcProductList(fileManager.query(query, type)); + } + + public Hashtable<String, Object> getProductTypeByName(String productTypeName) + throws RepositoryManagerException { + return XmlRpcStructFactory.getXmlRpcProductType(fileManager.getProductTypeByName(productTypeName)); + } + + public Hashtable<String, Object> getProductTypeById(String productTypeId) + throws RepositoryManagerException { + return XmlRpcStructFactory.getXmlRpcProductType(fileManager.getProductTypeById(productTypeId)); + } + + public boolean updateMetadata(Hashtable<String, Object> productHash, + Hashtable<String, Object> metadataHash) throws CatalogException{ + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + Metadata met = new Metadata(); + met.addMetadata(metadataHash); + fileManager.updateMetadata(product, met); + return true; + } + + public String catalogProduct(Hashtable<String, Object> productHash) + throws CatalogException { + Product p = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.catalogProduct(p); + } + + public synchronized boolean addMetadata(Hashtable<String, Object> productHash, + Hashtable<String, String> metadata) throws CatalogException { + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + Metadata met = new Metadata(); + met.addMetadata((Hashtable)metadata); + return fileManager.addMetadata(product, met) != null; + } + + public synchronized boolean addProductReferences(Hashtable<String, Object> productHash) + throws CatalogException { + Product product = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.addProductReferences(product); + } + + public String ingestProduct(Hashtable<String, Object> productHash, + Hashtable<String, String> metadata, boolean clientTransfer) + throws VersioningException, RepositoryManagerException, + DataTransferException, CatalogException { + + Product p = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + + Metadata m = new Metadata(); + m.addMetadata((Hashtable)metadata); + + return fileManager.ingestProduct(p, m, clientTransfer); + } + + public byte[] retrieveFile(String filePath, int offset, int numBytes) + throws DataTransferException { + return fileManager.retrieveFile(filePath, offset, numBytes); + } + + public boolean transferFile(String filePath, byte[] fileData, int offset, + int numBytes) { + return fileManager.transferFile(filePath, fileData, offset, numBytes); + } + + public boolean moveProduct(Hashtable<String, Object> productHash, String newPath) + throws DataTransferException { + Product p = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.moveProduct(p, newPath); + } + + public boolean removeFile(String filePath) throws DataTransferException, IOException { + return fileManager.removeFile(filePath); + } + + public boolean modifyProduct(Hashtable<?, ?> productHash) throws CatalogException { + Product p = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.modifyProduct(p); + } + + public boolean removeProduct(Hashtable<String, Object> productHash) throws CatalogException { + Product p = XmlRpcStructFactory.getProductFromXmlRpc(productHash); + return fileManager.removeProduct(p); + } + + public Hashtable<String, Object> getCatalogValues( + Hashtable<String, Object> metadataHash, + Hashtable<String, Object> productTypeHash) + throws RepositoryManagerException { + Metadata m = new Metadata(); + m.addMetadata(metadataHash); + ProductType productType = XmlRpcStructFactory.getProductTypeFromXmlRpc(productTypeHash); + return fileManager.getCatalogValues(m, productType).getHashtable(); + } + + public Hashtable<String, Object> getOrigValues( + Hashtable<String, Object> metadataHash, + Hashtable<String, Object> productTypeHash) + throws RepositoryManagerException { + Metadata m = new Metadata(); + m.addMetadata(metadataHash); + ProductType productType = XmlRpcStructFactory.getProductTypeFromXmlRpc(productTypeHash); + return fileManager.getOrigValues(m, productType).getHashtable(); + } + + public Hashtable<String, Object> getCatalogQuery( + Hashtable<String, Object> queryHash, + Hashtable<String, Object> productTypeHash) + throws RepositoryManagerException, QueryFormulationException { + Query query = XmlRpcStructFactory.getQueryFromXmlRpc(queryHash); + ProductType productType = XmlRpcStructFactory.getProductTypeFromXmlRpc(productTypeHash); + return XmlRpcStructFactory.getXmlRpcQuery(fileManager.getCatalogQuery(query, productType)); + } + + public boolean shutdown() { + if (this.webServer != null) { + this.webServer.shutdown(); + this.webServer = null; + return true; + } else + return false; + } + +}
