Repository: oodt Updated Branches: refs/heads/master 4f3588ce8 -> cb0c88299
update and clean up FM and WM. Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/cb0c8829 Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/cb0c8829 Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/cb0c8829 Branch: refs/heads/master Commit: cb0c882992516a6bd16f9376eb0fe8cdbec6ed82 Parents: 4f3588c Author: Chris Mattmann <[email protected]> Authored: Sun Jul 16 23:10:17 2017 -0700 Committer: Chris Mattmann <[email protected]> Committed: Sun Jul 16 23:10:17 2017 -0700 ---------------------------------------------------------------------- agility/oodt/filemgr.py | 554 +++++++++++++++++++++++++++++++++++++++++++ agility/oodt/fm.py | 554 ------------------------------------------- 2 files changed, 554 insertions(+), 554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/cb0c8829/agility/oodt/filemgr.py ---------------------------------------------------------------------- diff --git a/agility/oodt/filemgr.py b/agility/oodt/filemgr.py new file mode 100644 index 0000000..5eed046 --- /dev/null +++ b/agility/oodt/filemgr.py @@ -0,0 +1,554 @@ +#!/usr/bin/env python +# encoding: utf-8 +#/* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ +''' +CAS Filemgr Python Server + +This is the Python server API for an authenticated Catalog and Archive System. It uses the +OODT Catalog and Archive System as its core, adding user/password-based authentication +and role-based authorization. +''' + +import sys, os, os.path +import getopt, sha, pickle + +from ConfigParser import ConfigParser +from org.apache.oodt.cas.filemgr.system.auth import SecureWebServer, Dispatcher, Result +from org.apache.oodt.cas.filemgr.datatransfer import TransferStatusTracker +from org.apache.oodt.cas.filemgr.structs import Product +from org.apache.oodt.cas.filemgr.util import GenericFileManagerObjectFactory +from org.apache.oodt.cas.filemgr.util import XmlRpcStructFactory as Structs +from org.apache.oodt.cas.metadata import Metadata +from java.lang import Boolean, Double, Integer +from java.util import Hashtable, Vector + +# We choose these default factory classes because it minimizes our dependencies +# on heavyweight external packages, like smelly old SQL databases. +_defaultFactories = { + 'catalog': 'org.apache.oodt.cas.filemgr.catalog.LuceneCatalogFactory', + 'repository': 'org.apache.oodt.cas.filemgr.repository.XMLRepositoryManagerFactory', + 'datatransfer': 'org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory', + 'validation': 'org.apache.oodt.cas.filemgr.validation.XMLValidationLayerFactory' +} + +# All available permissions. By default, the "root" user will be in the "wheel" +# group and will have these permissions. +_allPerms = [ + 'filemgr.addMetadata', + 'filemgr.addProductReferences', + 'filemgr.addProductType', + 'filemgr.catalogProduct', + 'filemgr.getCurrentFileTransfer', + 'filemgr.getCurrentFileTransfers', + 'filemgr.getElementById', + 'filemgr.getElementByName', + 'filemgr.getElementsByProductType', + 'filemgr.getFirstPage', + 'filemgr.getLastPage', + 'filemgr.getMetadata', + 'filemgr.getNextPage', + 'filemgr.getNumProducts', + 'filemgr.getPrevPage', + 'filemgr.getProductById', + 'filemgr.getProductByName', + 'filemgr.getProductPctTransferred', + 'filemgr.getProductReferences', + 'filemgr.getProductsByProductType', + 'filemgr.getProductTypeById', + 'filemgr.getProductTypeByName', + 'filemgr.getProductTypes', + 'filemgr.getRefPctTransferred', + 'filemgr.getTopNProducts', + 'filemgr.handleRequest', + 'filemgr.hasProduct', + 'filemgr.ingestProduct', + 'filemgr.isTransferComplete', + 'filemgr.pagedQuery', + 'filemgr.query', + 'filemgr.removeFile', + 'filemgr.removeProductTransferStatus', + 'filemgr.setProductTransferStatus', + 'filemgr.transferFile', + 'filemgr.transferringProduct', + 'usermgr.addGroup', + 'usermgr.addPermissionToGroup', + 'usermgr.addUser', + 'usermgr.addUserToGroup', + 'usermgr.removeGroup', + 'usermgr.removePermissionFromGroup', + 'usermgr.removeUser', + 'usermgr.removeUserFromGroup' +] + +def _toJavaBoolean(truthiness): + '''Convert a Python boolean into the string format that Java uses: true or false. + ''' + if truthiness: + return 'true' + else: + return 'false' + + +def _encodePassword(pw): + '''Encode a password using an SHA-1 digest. + ''' + return sha.new(pw).digest() + + +class User: + '''A user of the CAS. Users don't have permissions directly; instead they receive + them implicitly by being members of groups, which do have permissions. + ''' + def __init__(self, userID, name, email, password, groups=[]): + self.userID, self.name, self.email, self.password, self.groups = userID, name, email, password, groups + + def __cmp__(self, other): + return cmp(self.userID, other.userID) + + def __hash__(self): + return hash(self.userID) + + def __repr__(self): + return 'User(userID=%s,name=%s,email=%s,password=%s,groups=%r)' % ( + self.userID, self.name, self.email, self.password, self.groups + ) + + +class Group: + '''A CAS group. The group contains a sequence of permissions, which are strings + that name the XML-RPC methods that the group is allowed to call. + ''' + def __init__(self, groupID, name, email, perms=[]): + self.groupID, self.name, self.email, self.perms = groupID, name, email, perms + + def __cmp__(self, other): + return cmp(self.groupID, other.groupID) + + def __hash__(self): + return hash(self.groupID) + + def __repr__(self): + return 'Group(groupdID=%s,name=%s,email=%s,perms=%r)' % (self.groupdID, self.name, self.email, self.perms) + + +class UserDB: + '''The user database records all the users and groups. + ''' + def __init__(self, users, groups, filename): + self.users, self.groups, self.filename = users, groups, filename + + def authenticate(self, name, password): + '''Authenticate a user by checking the user name and password. + Return true if the user's password matches the given one. The + password given should be in SHA-1 digest format. + ''' + user = self.users[name] + return user.userID == name and user.password == password + + def authorize(self, name, perm): + '''Authorize if the user has the given permission. Return true if + the user can do it, false otherwise. + ''' + user = self.users[name] + for group in user.groups: + if perm in group.perms: + return True + return False + + def save(self): + '''Save the user database to disk. + ''' + f = file(self.filename, 'wb') + pickle.dump(self, f) + f.close() + + +class FileMgrDispatcher(Dispatcher): + '''The file manager dispatcher handles all XML-RPC calls. + ''' + def __init__(self, catalog, repo, xfer, userDB): + self.catalog, self.repo, self.xfer, self.userDB = catalog, repo, xfer, userDB + self.tracker = TransferStatusTracker(self.catalog) + + def handleRequest(self, methodSpecifier, params, user, password): + '''Handle an XML-RPC request. First, authenticate the user. If the user's + authentic (by dint of providing a correct user ID and password pair), then + authorize if the method the user is trying to call is available. + ''' + password = _encodePassword(password) + if self.userDB.authenticate(user, password): + if self.userDB.authorize(user, methodSpecifier): + obj, method = methodSpecifier.split('.') + if obj not in ('filemgr', 'usermgr'): + raise ValueError('Unknown object') + func = getattr(self, method) + return func(params) + raise ValueError('Not authorized for "%s"' % methodSpecifier) + raise ValueError('Illegal user name "%s" and/or password' % user) + + def getProductTypeByName(self, params): + return Result(None, Structs.getXmlRpcProductType(self.repo.getProductTypeByName(params[0]))) + + def ingestProduct(self, params): + productHash, metadata, clientXfer = params + p = Structs.getProductFromXmlRpc(productHash) + p.setTransferStatus(Product.STATUS_TRANSFER) + self.catalog.addProduct(p) + + m = Metadata() + m.addMetadata(metadata) + self.catalog.addMetadata(m, p) + + if not clientXfer: + versioner = GenericFileManagerObjectFactory.getVersionerFromClassName(p.getProductType().getVersioner()) + versioner.createDataStoreReferences(p, m) + self.catalog.addProductReferences(p) + self.xfer.transferProduct(p) + p.setTransferStatus(Product.STATUS_RECEIVED) + self.catalog.setProductTranfserStatus(p) + return Result(None, p.getProductId()) + + def addProductReferences(self, params): + self.catalog.addProductReferences(Structs.getProductFromXmlRpc(params[0])) + return Result(Boolean, 'true') + + def transferringProduct(self, params): + self.tracker.transferringProduct(Structs.getProductFromXmlRpc(params[0])) + return Result(Boolean, 'true') + + def removeProductTransferStatus(self, params): + self.tracker.removeProductTransferStatus(Structs.getProductFromXmlRpc(params[0])) + return Result(Boolean, 'true') + + def setProductTransferStatus(self, params): + self.catalog.setProductTransferStatus(Structs.getProductFromXmlRpc(params[0])) + return Result(Boolean, 'true') + + def getCurrentFileTransfer(self, params): + status = self.tracker.getCurrentFileTransfer() + if status is None: + return Result(None, Hashtable()) + else: + return Result(None, Structs.getXmlRpcFileTransferStatus(status)) + + def getCurrentFileTransfers(self, params): + xfers = self.tracker.getCurrentFileTransfers() + if xfers is not None and len(xfers) > 0: + return Result(None, Structs.getXmlRpcFileTransferStatuses(xfers)) + else: + return Result(None, Vector()) + + def getProductPctTransferred(self, params): + return Result(Double, str(self.tracker.getPctTransferred((Structs.getProductFromXmlRpc(params[0]))))) + + def getRefPctTransferred(self, params): + pct = self.tracker.getPctTransferred(Structs.getReferenceFromXmlRpc(params[0])) + return Result(Double, str(pct)) + + def isTransferComplete(self, params): + return Result(Boolean, _toJavaBoolean(self.tracker.isTransferComplete(Structs.getProductFromXmlRpc(params[0])))) + + def pagedQuery(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[1]) + query = Structs.getQueryFromXmlRpc(params[0]) + return Result(None, Structs.getXmlRpcProductPage(self.catalog.pagedQuery(query, ptype, params[2]))) + + def getFirstPage(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + return Result(None, Structs.getXmlRpcProductPage(self.catalog.getFirstPage(ptype))) + + def getLastPage(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + return Result(None, Structs.getXmlRpcProductPage(self.catalog.getLastProductPage(ptype))) + + def getNextPage(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + page = Structs.getProductPageFromXmlRpc(params[1]) + return Result(None, Structs.getXmlRpcProductPage(self.catalog.getNextPage(ptype, page))) + + def getPrevPage(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + page = Structs.getProductPageFromXmlRpc(params[1]) + return Result(None, Structs.getXmlRpcProductPage(self.catalog.getPrevPage(ptype, page))) + + def addProductType(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + self.repo.addProductType(ptype) + return Result(None, ptype.getProductTypeId()) + + def getNumProducts(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + return Result(Integer, str(self.catalog.getNumProducts(ptype))) + + def getTopNProducts(self, params): + if len(params) == 1: + return Result(None, Structs.getXmlRpcProductList(self.catalog.getTopNProducts(params[0]))) + ptype = Structs.getProductTypeFromXmlRpc(params[1]) + return Result(None, Structs.getXmlRpcProductList(self.catalog.getTopNProducts(params[0], ptype))) + + def hasProduct(self, params): + p = self.catalog.getProductByName(params[0]) + return Result(Boolean, _toJavaBoolean(p is not None and p.transferStatus == Product.STATUS_RECEIVED)) + + def getMetadata(self, params): + return Result(None, self.catalog.getMetadata(Structs.getProductFromXmlRpc(params[0])).getHashtable()) + + def getProductTypes(self, params): + return Result(None, Structs.getXmlRpcProductList(self.repo.getProductTypes())) + + def getProductReferences(self, params): + p = Structs.getProductFromXmlRpc(params[0]) + return Result(None, Structs.getXmlRpcReferences(self.catalog.getProductReferences(p))) + + def getProductById(self, params): + return Result(None, Structs.getXmlRpcProduct(self.catalog.getProductById(params[0]))) + + def getProductByName(self, params): + return Result(None, Structs.getXmlRpcProduct(self.catalog.getProductByName(params[0]))) + + def getProductsByProductType(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + return Result(None, Structs.getXmlRpcProductList(self.catalog.getProductsByProductType(ptype))) + + def getElementsByProductType(self, params): + ptype = Structs.getProductTypeFromXmlRpc(params[0]) + return Structs.getXmlRpcElementList(self.catalog.getValidationLayer().getElements(ptype)) + + def getElementById(self, params): + return Structs.getXmlRpcElement(self.catalog.getValidationLayer().getElementById(params[0])) + + def getElementByName(self, params): + return Structs.getXmlRpcElement(self.catalog.getValidationLayer().getElementByName(params[0])) + + def query(self, params): + q = Structs.getQueryFromXmlRpc(params[0]) + ptype = Structs.getProductTypeFromXmlRpc(params[1]) + ids = self.catalog.query(q, ptype) + if ids is not None and len(ids) > 0: + return Result(None, [self.catalog.getProductById(i) for i in ids]) + return Result(None, Vector()) + + def getProductTypeById(self, params): + ptype = self.repo.getProductTypeById(params[0]) + return Result(None, Structs.getXmlRpcProductType(ptype)) + + def catalogProduct(self, params): + p = Structs.getProductFromXmlRpc(params[0]) + return Result(None, self.catalog.addProduct(p)) + + def addMetadata(self, params): + p = Structs.getProductFromXmlRpc(params[0]) + m = Metadata() + m.addMetadata(params[1]) + self.catalog.addMetadata(m, p) + return Result(Boolean, 'true') + + def transferFile(self, params): + outFile, data, offset, numBytes = params + if os.path.exists(outFile): + out = file(outFile, 'ab') + else: + dirPath = os.dirname(outFile) + os.makedirs(dirPath) + out = file(outFile, 'wb') + out.seek(offset) + out.write(data) + out.close() + return Result(Boolean, 'true') + + def removeFile(self, params): + os.remove(params[0]) + return Result(Boolean, 'true') + + def addUser(self, params): + userID = params[0] + user = User(userID, params[1], params[2], _encodePassword(params[3]), []) + self.userDB.users[userID] = user + self.userDB.save() + return Result(Boolean, 'true') + + def removeUser(self, params): + del self.userDB.users[params[0]] + return Result(Boolean, 'true') + + def addGroup(self, params): + groupID = params[0] + group = Group(groupID, params[1], params[2]) + self.userDB.groups[groupID] = group + self.userDB.save() + return Result(Boolean, 'true') + + def removeGroup(self, params): + groupID = params[0] + del self.userDB.groups[groupID] + for user in self.userDB.users.itervalues(): + indexes = [] + index = 0 + for group in user.groups: + if group.groupID == groupID: + indexes.append(index) + index += 1 + indexes.reverse() + for index in indexes: + del user.groups[index] + self.userDB.save() + return Result(Boolean, 'true') + + def addUserToGroup(self, params): + self.userDB.users[params[0]].groups.append(self.userDB.groups[params[1]]) + return Result(Boolean, 'true') + + def removeUserFromGroup(self, params): + groupID = params[1] + user = self.userDB.users[params[0]] + indexes = [] + index = 0 + for group in user.group: + if group.groupID == groupID: + indexes.append(index) + index += 1 + indexes.reverse() + for index in indexes: + del user.groups[index] + self.userDB.save() + return Result(Boolean, 'true') + + def addPermissionToGroup(self, params): + self.userDB.groups[params[0]].perms.append(params[1]) + self.userDB.save() + return Result(Boolean, 'true') + + def removePermissionFromGroup(self, params): + permName = params[1] + group = self.userDB.groups[params[0]] + indexes = [] + index = 0 + for perm in group.perms: + if perm == permName: + indexes.append(index) + indexes.reverse() + for index in indexes: + del group.perms[index] + self.userDB.save() + return Result(Boolean, 'true') + + +def _usage(): + '''Show a usage message to the stderr and quit. + ''' + print >>sys.stderr, 'Usage: %s [-c <configFile>]' % sys.argv[0] + print >>sys.stderr, ' or: %s [--config=<configFile>]' % sys.argv[0] + sys.exit(2) + + +def _parseCommandLine(): + '''Parse the command line options. If any. The only option is -c (or --config) + that names a configuration file to use. If none given, reasonable defaults are + used. Well, mostly reasonable. + ''' + try: + opts, args = getopt.getopt(sys.argv[1:], 'c:', 'config=') + except getopt.GetoptError: + _usage() + configFile = None + for option, arg in opts: + if option in ('-c', '--config'): + configFile = arg + if configFile is None or len(configFile) == 0: + _usage() + return configFile + + +def _getConfig(configFile): + '''Get the configuration. This populates a configuration with default values + and then overwrites them with the configFile, which may be None (in which case, + no overwriting happens). + ''' + configParser = ConfigParser() + configParser.add_section('factories') + for key, val in _defaultFactories.iteritems(): + configParser.set('factories', key, val) + + configParser.add_section('index') + configParser.set('index', 'path', 'index') + configParser.set('index', 'pageSize', '20') + + current = '/'.join(os.path.split(os.getcwd())) + configParser.add_section('policies') + configParser.set('policies', 'repo', 'file:%s/policy' % current) + configParser.set('policies', 'validation', 'file:%s/policy' % current) + configParser.set('policies', 'user', '%s/user.db' % current) + + if configFile is not None: + configParser.readfp(file(configFile)) + return configParser + + +def _setJavaProperties(config): + '''Set Java-based properties. The Java-based cas expects a whole bunch of + system properties to be set, sort of like global variables. Woot! Global + variables! + ''' + from java.lang import System + System.setProperty('org.apache.oodt.cas.filemgr.catalog.lucene.idxPath', config.get('index', 'path')) + System.setProperty('org.apache.oodt.cas.filemgr.catalog.lucene.pageSize', config.get('index', 'pageSize')) + System.setProperty('org.apache.oodt.cas.filemgr.repositorymgr.dirs', config.get('policies', 'repo')) + System.setProperty('org.apache.oodt.cas.filemgr.validation.dirs', config.get('policies', 'validation')) + System.setProperty('org.apache.oodt.cas.filemgr.datatransfer.remote.chunkSize', '1024') + System.setProperty('filemgr.repository.factory', config.get('factories', 'repository')) + System.setProperty('filemgr.catalog.factory', config.get('factories', 'catalog')) + System.setProperty('filemgr.datatransfer.factory', config.get('factories', 'datatransfer')) + System.setProperty('filemgr.validationLayer.factory', config.get('factories', 'validation')) + +def _getUserDB(path): + '''Get the user database, creating it if necessary. + ''' + try: + f = file(path, 'rb') + db = pickle.load(f) + f.close() + db.filename = path + db.save() + except: + wheel = Group('wheel', 'Administrators', '[email protected]', _allPerms) + root = User('root', 'Super User', '[email protected]', _encodePassword('poipu'), [wheel]) + db = UserDB({'root': root}, {'wheel': wheel}, path) + db.save() + return db + + +def main(): + '''Start the CAS Filemgr Backend. + ''' + configFile = _parseCommandLine() + configParser = _getConfig(configFile) + _setJavaProperties(configParser) + + catalog = GenericFileManagerObjectFactory.getCatalogServiceFromFactory(configParser.get('factories', 'catalog')) + repo = GenericFileManagerObjectFactory.getRepositoryManagerServiceFromFactory(configParser.get('factories', 'repository')) + xfer = GenericFileManagerObjectFactory.getDataTransferServiceFromFactory(configParser.get('factories', 'datatransfer')) + userDB = _getUserDB(configParser.get('policies', 'user')) + + ws = SecureWebServer(1999) + ws.addDispatcher(FileMgrDispatcher(catalog, repo, xfer, userDB)) + ws.start() + + +if __name__ == '__main__': + main() + http://git-wip-us.apache.org/repos/asf/oodt/blob/cb0c8829/agility/oodt/fm.py ---------------------------------------------------------------------- diff --git a/agility/oodt/fm.py b/agility/oodt/fm.py deleted file mode 100644 index 5eed046..0000000 --- a/agility/oodt/fm.py +++ /dev/null @@ -1,554 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -#/* -# * Licensed to the Apache Software Foundation (ASF) under one or more -# * contributor license agreements. See the NOTICE file distributed with -# * this work for additional information regarding copyright ownership. -# * The ASF licenses this file to You under the Apache License, Version 2.0 -# * (the "License"); you may not use this file except in compliance with -# * the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ -''' -CAS Filemgr Python Server - -This is the Python server API for an authenticated Catalog and Archive System. It uses the -OODT Catalog and Archive System as its core, adding user/password-based authentication -and role-based authorization. -''' - -import sys, os, os.path -import getopt, sha, pickle - -from ConfigParser import ConfigParser -from org.apache.oodt.cas.filemgr.system.auth import SecureWebServer, Dispatcher, Result -from org.apache.oodt.cas.filemgr.datatransfer import TransferStatusTracker -from org.apache.oodt.cas.filemgr.structs import Product -from org.apache.oodt.cas.filemgr.util import GenericFileManagerObjectFactory -from org.apache.oodt.cas.filemgr.util import XmlRpcStructFactory as Structs -from org.apache.oodt.cas.metadata import Metadata -from java.lang import Boolean, Double, Integer -from java.util import Hashtable, Vector - -# We choose these default factory classes because it minimizes our dependencies -# on heavyweight external packages, like smelly old SQL databases. -_defaultFactories = { - 'catalog': 'org.apache.oodt.cas.filemgr.catalog.LuceneCatalogFactory', - 'repository': 'org.apache.oodt.cas.filemgr.repository.XMLRepositoryManagerFactory', - 'datatransfer': 'org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory', - 'validation': 'org.apache.oodt.cas.filemgr.validation.XMLValidationLayerFactory' -} - -# All available permissions. By default, the "root" user will be in the "wheel" -# group and will have these permissions. -_allPerms = [ - 'filemgr.addMetadata', - 'filemgr.addProductReferences', - 'filemgr.addProductType', - 'filemgr.catalogProduct', - 'filemgr.getCurrentFileTransfer', - 'filemgr.getCurrentFileTransfers', - 'filemgr.getElementById', - 'filemgr.getElementByName', - 'filemgr.getElementsByProductType', - 'filemgr.getFirstPage', - 'filemgr.getLastPage', - 'filemgr.getMetadata', - 'filemgr.getNextPage', - 'filemgr.getNumProducts', - 'filemgr.getPrevPage', - 'filemgr.getProductById', - 'filemgr.getProductByName', - 'filemgr.getProductPctTransferred', - 'filemgr.getProductReferences', - 'filemgr.getProductsByProductType', - 'filemgr.getProductTypeById', - 'filemgr.getProductTypeByName', - 'filemgr.getProductTypes', - 'filemgr.getRefPctTransferred', - 'filemgr.getTopNProducts', - 'filemgr.handleRequest', - 'filemgr.hasProduct', - 'filemgr.ingestProduct', - 'filemgr.isTransferComplete', - 'filemgr.pagedQuery', - 'filemgr.query', - 'filemgr.removeFile', - 'filemgr.removeProductTransferStatus', - 'filemgr.setProductTransferStatus', - 'filemgr.transferFile', - 'filemgr.transferringProduct', - 'usermgr.addGroup', - 'usermgr.addPermissionToGroup', - 'usermgr.addUser', - 'usermgr.addUserToGroup', - 'usermgr.removeGroup', - 'usermgr.removePermissionFromGroup', - 'usermgr.removeUser', - 'usermgr.removeUserFromGroup' -] - -def _toJavaBoolean(truthiness): - '''Convert a Python boolean into the string format that Java uses: true or false. - ''' - if truthiness: - return 'true' - else: - return 'false' - - -def _encodePassword(pw): - '''Encode a password using an SHA-1 digest. - ''' - return sha.new(pw).digest() - - -class User: - '''A user of the CAS. Users don't have permissions directly; instead they receive - them implicitly by being members of groups, which do have permissions. - ''' - def __init__(self, userID, name, email, password, groups=[]): - self.userID, self.name, self.email, self.password, self.groups = userID, name, email, password, groups - - def __cmp__(self, other): - return cmp(self.userID, other.userID) - - def __hash__(self): - return hash(self.userID) - - def __repr__(self): - return 'User(userID=%s,name=%s,email=%s,password=%s,groups=%r)' % ( - self.userID, self.name, self.email, self.password, self.groups - ) - - -class Group: - '''A CAS group. The group contains a sequence of permissions, which are strings - that name the XML-RPC methods that the group is allowed to call. - ''' - def __init__(self, groupID, name, email, perms=[]): - self.groupID, self.name, self.email, self.perms = groupID, name, email, perms - - def __cmp__(self, other): - return cmp(self.groupID, other.groupID) - - def __hash__(self): - return hash(self.groupID) - - def __repr__(self): - return 'Group(groupdID=%s,name=%s,email=%s,perms=%r)' % (self.groupdID, self.name, self.email, self.perms) - - -class UserDB: - '''The user database records all the users and groups. - ''' - def __init__(self, users, groups, filename): - self.users, self.groups, self.filename = users, groups, filename - - def authenticate(self, name, password): - '''Authenticate a user by checking the user name and password. - Return true if the user's password matches the given one. The - password given should be in SHA-1 digest format. - ''' - user = self.users[name] - return user.userID == name and user.password == password - - def authorize(self, name, perm): - '''Authorize if the user has the given permission. Return true if - the user can do it, false otherwise. - ''' - user = self.users[name] - for group in user.groups: - if perm in group.perms: - return True - return False - - def save(self): - '''Save the user database to disk. - ''' - f = file(self.filename, 'wb') - pickle.dump(self, f) - f.close() - - -class FileMgrDispatcher(Dispatcher): - '''The file manager dispatcher handles all XML-RPC calls. - ''' - def __init__(self, catalog, repo, xfer, userDB): - self.catalog, self.repo, self.xfer, self.userDB = catalog, repo, xfer, userDB - self.tracker = TransferStatusTracker(self.catalog) - - def handleRequest(self, methodSpecifier, params, user, password): - '''Handle an XML-RPC request. First, authenticate the user. If the user's - authentic (by dint of providing a correct user ID and password pair), then - authorize if the method the user is trying to call is available. - ''' - password = _encodePassword(password) - if self.userDB.authenticate(user, password): - if self.userDB.authorize(user, methodSpecifier): - obj, method = methodSpecifier.split('.') - if obj not in ('filemgr', 'usermgr'): - raise ValueError('Unknown object') - func = getattr(self, method) - return func(params) - raise ValueError('Not authorized for "%s"' % methodSpecifier) - raise ValueError('Illegal user name "%s" and/or password' % user) - - def getProductTypeByName(self, params): - return Result(None, Structs.getXmlRpcProductType(self.repo.getProductTypeByName(params[0]))) - - def ingestProduct(self, params): - productHash, metadata, clientXfer = params - p = Structs.getProductFromXmlRpc(productHash) - p.setTransferStatus(Product.STATUS_TRANSFER) - self.catalog.addProduct(p) - - m = Metadata() - m.addMetadata(metadata) - self.catalog.addMetadata(m, p) - - if not clientXfer: - versioner = GenericFileManagerObjectFactory.getVersionerFromClassName(p.getProductType().getVersioner()) - versioner.createDataStoreReferences(p, m) - self.catalog.addProductReferences(p) - self.xfer.transferProduct(p) - p.setTransferStatus(Product.STATUS_RECEIVED) - self.catalog.setProductTranfserStatus(p) - return Result(None, p.getProductId()) - - def addProductReferences(self, params): - self.catalog.addProductReferences(Structs.getProductFromXmlRpc(params[0])) - return Result(Boolean, 'true') - - def transferringProduct(self, params): - self.tracker.transferringProduct(Structs.getProductFromXmlRpc(params[0])) - return Result(Boolean, 'true') - - def removeProductTransferStatus(self, params): - self.tracker.removeProductTransferStatus(Structs.getProductFromXmlRpc(params[0])) - return Result(Boolean, 'true') - - def setProductTransferStatus(self, params): - self.catalog.setProductTransferStatus(Structs.getProductFromXmlRpc(params[0])) - return Result(Boolean, 'true') - - def getCurrentFileTransfer(self, params): - status = self.tracker.getCurrentFileTransfer() - if status is None: - return Result(None, Hashtable()) - else: - return Result(None, Structs.getXmlRpcFileTransferStatus(status)) - - def getCurrentFileTransfers(self, params): - xfers = self.tracker.getCurrentFileTransfers() - if xfers is not None and len(xfers) > 0: - return Result(None, Structs.getXmlRpcFileTransferStatuses(xfers)) - else: - return Result(None, Vector()) - - def getProductPctTransferred(self, params): - return Result(Double, str(self.tracker.getPctTransferred((Structs.getProductFromXmlRpc(params[0]))))) - - def getRefPctTransferred(self, params): - pct = self.tracker.getPctTransferred(Structs.getReferenceFromXmlRpc(params[0])) - return Result(Double, str(pct)) - - def isTransferComplete(self, params): - return Result(Boolean, _toJavaBoolean(self.tracker.isTransferComplete(Structs.getProductFromXmlRpc(params[0])))) - - def pagedQuery(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[1]) - query = Structs.getQueryFromXmlRpc(params[0]) - return Result(None, Structs.getXmlRpcProductPage(self.catalog.pagedQuery(query, ptype, params[2]))) - - def getFirstPage(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - return Result(None, Structs.getXmlRpcProductPage(self.catalog.getFirstPage(ptype))) - - def getLastPage(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - return Result(None, Structs.getXmlRpcProductPage(self.catalog.getLastProductPage(ptype))) - - def getNextPage(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - page = Structs.getProductPageFromXmlRpc(params[1]) - return Result(None, Structs.getXmlRpcProductPage(self.catalog.getNextPage(ptype, page))) - - def getPrevPage(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - page = Structs.getProductPageFromXmlRpc(params[1]) - return Result(None, Structs.getXmlRpcProductPage(self.catalog.getPrevPage(ptype, page))) - - def addProductType(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - self.repo.addProductType(ptype) - return Result(None, ptype.getProductTypeId()) - - def getNumProducts(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - return Result(Integer, str(self.catalog.getNumProducts(ptype))) - - def getTopNProducts(self, params): - if len(params) == 1: - return Result(None, Structs.getXmlRpcProductList(self.catalog.getTopNProducts(params[0]))) - ptype = Structs.getProductTypeFromXmlRpc(params[1]) - return Result(None, Structs.getXmlRpcProductList(self.catalog.getTopNProducts(params[0], ptype))) - - def hasProduct(self, params): - p = self.catalog.getProductByName(params[0]) - return Result(Boolean, _toJavaBoolean(p is not None and p.transferStatus == Product.STATUS_RECEIVED)) - - def getMetadata(self, params): - return Result(None, self.catalog.getMetadata(Structs.getProductFromXmlRpc(params[0])).getHashtable()) - - def getProductTypes(self, params): - return Result(None, Structs.getXmlRpcProductList(self.repo.getProductTypes())) - - def getProductReferences(self, params): - p = Structs.getProductFromXmlRpc(params[0]) - return Result(None, Structs.getXmlRpcReferences(self.catalog.getProductReferences(p))) - - def getProductById(self, params): - return Result(None, Structs.getXmlRpcProduct(self.catalog.getProductById(params[0]))) - - def getProductByName(self, params): - return Result(None, Structs.getXmlRpcProduct(self.catalog.getProductByName(params[0]))) - - def getProductsByProductType(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - return Result(None, Structs.getXmlRpcProductList(self.catalog.getProductsByProductType(ptype))) - - def getElementsByProductType(self, params): - ptype = Structs.getProductTypeFromXmlRpc(params[0]) - return Structs.getXmlRpcElementList(self.catalog.getValidationLayer().getElements(ptype)) - - def getElementById(self, params): - return Structs.getXmlRpcElement(self.catalog.getValidationLayer().getElementById(params[0])) - - def getElementByName(self, params): - return Structs.getXmlRpcElement(self.catalog.getValidationLayer().getElementByName(params[0])) - - def query(self, params): - q = Structs.getQueryFromXmlRpc(params[0]) - ptype = Structs.getProductTypeFromXmlRpc(params[1]) - ids = self.catalog.query(q, ptype) - if ids is not None and len(ids) > 0: - return Result(None, [self.catalog.getProductById(i) for i in ids]) - return Result(None, Vector()) - - def getProductTypeById(self, params): - ptype = self.repo.getProductTypeById(params[0]) - return Result(None, Structs.getXmlRpcProductType(ptype)) - - def catalogProduct(self, params): - p = Structs.getProductFromXmlRpc(params[0]) - return Result(None, self.catalog.addProduct(p)) - - def addMetadata(self, params): - p = Structs.getProductFromXmlRpc(params[0]) - m = Metadata() - m.addMetadata(params[1]) - self.catalog.addMetadata(m, p) - return Result(Boolean, 'true') - - def transferFile(self, params): - outFile, data, offset, numBytes = params - if os.path.exists(outFile): - out = file(outFile, 'ab') - else: - dirPath = os.dirname(outFile) - os.makedirs(dirPath) - out = file(outFile, 'wb') - out.seek(offset) - out.write(data) - out.close() - return Result(Boolean, 'true') - - def removeFile(self, params): - os.remove(params[0]) - return Result(Boolean, 'true') - - def addUser(self, params): - userID = params[0] - user = User(userID, params[1], params[2], _encodePassword(params[3]), []) - self.userDB.users[userID] = user - self.userDB.save() - return Result(Boolean, 'true') - - def removeUser(self, params): - del self.userDB.users[params[0]] - return Result(Boolean, 'true') - - def addGroup(self, params): - groupID = params[0] - group = Group(groupID, params[1], params[2]) - self.userDB.groups[groupID] = group - self.userDB.save() - return Result(Boolean, 'true') - - def removeGroup(self, params): - groupID = params[0] - del self.userDB.groups[groupID] - for user in self.userDB.users.itervalues(): - indexes = [] - index = 0 - for group in user.groups: - if group.groupID == groupID: - indexes.append(index) - index += 1 - indexes.reverse() - for index in indexes: - del user.groups[index] - self.userDB.save() - return Result(Boolean, 'true') - - def addUserToGroup(self, params): - self.userDB.users[params[0]].groups.append(self.userDB.groups[params[1]]) - return Result(Boolean, 'true') - - def removeUserFromGroup(self, params): - groupID = params[1] - user = self.userDB.users[params[0]] - indexes = [] - index = 0 - for group in user.group: - if group.groupID == groupID: - indexes.append(index) - index += 1 - indexes.reverse() - for index in indexes: - del user.groups[index] - self.userDB.save() - return Result(Boolean, 'true') - - def addPermissionToGroup(self, params): - self.userDB.groups[params[0]].perms.append(params[1]) - self.userDB.save() - return Result(Boolean, 'true') - - def removePermissionFromGroup(self, params): - permName = params[1] - group = self.userDB.groups[params[0]] - indexes = [] - index = 0 - for perm in group.perms: - if perm == permName: - indexes.append(index) - indexes.reverse() - for index in indexes: - del group.perms[index] - self.userDB.save() - return Result(Boolean, 'true') - - -def _usage(): - '''Show a usage message to the stderr and quit. - ''' - print >>sys.stderr, 'Usage: %s [-c <configFile>]' % sys.argv[0] - print >>sys.stderr, ' or: %s [--config=<configFile>]' % sys.argv[0] - sys.exit(2) - - -def _parseCommandLine(): - '''Parse the command line options. If any. The only option is -c (or --config) - that names a configuration file to use. If none given, reasonable defaults are - used. Well, mostly reasonable. - ''' - try: - opts, args = getopt.getopt(sys.argv[1:], 'c:', 'config=') - except getopt.GetoptError: - _usage() - configFile = None - for option, arg in opts: - if option in ('-c', '--config'): - configFile = arg - if configFile is None or len(configFile) == 0: - _usage() - return configFile - - -def _getConfig(configFile): - '''Get the configuration. This populates a configuration with default values - and then overwrites them with the configFile, which may be None (in which case, - no overwriting happens). - ''' - configParser = ConfigParser() - configParser.add_section('factories') - for key, val in _defaultFactories.iteritems(): - configParser.set('factories', key, val) - - configParser.add_section('index') - configParser.set('index', 'path', 'index') - configParser.set('index', 'pageSize', '20') - - current = '/'.join(os.path.split(os.getcwd())) - configParser.add_section('policies') - configParser.set('policies', 'repo', 'file:%s/policy' % current) - configParser.set('policies', 'validation', 'file:%s/policy' % current) - configParser.set('policies', 'user', '%s/user.db' % current) - - if configFile is not None: - configParser.readfp(file(configFile)) - return configParser - - -def _setJavaProperties(config): - '''Set Java-based properties. The Java-based cas expects a whole bunch of - system properties to be set, sort of like global variables. Woot! Global - variables! - ''' - from java.lang import System - System.setProperty('org.apache.oodt.cas.filemgr.catalog.lucene.idxPath', config.get('index', 'path')) - System.setProperty('org.apache.oodt.cas.filemgr.catalog.lucene.pageSize', config.get('index', 'pageSize')) - System.setProperty('org.apache.oodt.cas.filemgr.repositorymgr.dirs', config.get('policies', 'repo')) - System.setProperty('org.apache.oodt.cas.filemgr.validation.dirs', config.get('policies', 'validation')) - System.setProperty('org.apache.oodt.cas.filemgr.datatransfer.remote.chunkSize', '1024') - System.setProperty('filemgr.repository.factory', config.get('factories', 'repository')) - System.setProperty('filemgr.catalog.factory', config.get('factories', 'catalog')) - System.setProperty('filemgr.datatransfer.factory', config.get('factories', 'datatransfer')) - System.setProperty('filemgr.validationLayer.factory', config.get('factories', 'validation')) - -def _getUserDB(path): - '''Get the user database, creating it if necessary. - ''' - try: - f = file(path, 'rb') - db = pickle.load(f) - f.close() - db.filename = path - db.save() - except: - wheel = Group('wheel', 'Administrators', '[email protected]', _allPerms) - root = User('root', 'Super User', '[email protected]', _encodePassword('poipu'), [wheel]) - db = UserDB({'root': root}, {'wheel': wheel}, path) - db.save() - return db - - -def main(): - '''Start the CAS Filemgr Backend. - ''' - configFile = _parseCommandLine() - configParser = _getConfig(configFile) - _setJavaProperties(configParser) - - catalog = GenericFileManagerObjectFactory.getCatalogServiceFromFactory(configParser.get('factories', 'catalog')) - repo = GenericFileManagerObjectFactory.getRepositoryManagerServiceFromFactory(configParser.get('factories', 'repository')) - xfer = GenericFileManagerObjectFactory.getDataTransferServiceFromFactory(configParser.get('factories', 'datatransfer')) - userDB = _getUserDB(configParser.get('policies', 'user')) - - ws = SecureWebServer(1999) - ws.addDispatcher(FileMgrDispatcher(catalog, repo, xfer, userDB)) - ws.start() - - -if __name__ == '__main__': - main() -
