registry based persistence - refactored the RegistryManager and PersistenceManager
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/fd6b573e Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/fd6b573e Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/fd6b573e Branch: refs/heads/master Commit: fd6b573e93090b28b0be159d04aca29a380983d5 Parents: b3238bd Author: Isuru <[email protected]> Authored: Fri Dec 27 00:36:09 2013 +0530 Committer: Isuru <[email protected]> Committed: Fri Dec 27 00:36:09 2013 +0530 ---------------------------------------------------------------------- .../adc/mgt/persistence/PersistenceManager.java | 9 +- .../RegistryBasedPersistenceManager.java | 137 +++++++++++----- .../adc/mgt/registry/RegistryManager.java | 158 +++++++++++++------ 3 files changed, 217 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fd6b573e/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java index 09ea562..e3afaf2 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/PersistenceManager.java @@ -22,7 +22,7 @@ package org.apache.stratos.adc.mgt.persistence; import org.apache.stratos.adc.mgt.exception.PersistenceManagerException; import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription; -import java.util.Collection; +import java.util.List; public abstract class PersistenceManager { @@ -35,13 +35,16 @@ public abstract class PersistenceManager { public abstract CartridgeSubscription getCartridgeSubscription(int tenantId, String alias) throws PersistenceManagerException; - public abstract Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId) + public abstract List<CartridgeSubscription> getCartridgeSubscriptions() + throws PersistenceManagerException; + + public abstract List<CartridgeSubscription> getCartridgeSubscriptions(int tenantId) throws PersistenceManagerException; public abstract CartridgeSubscription getCartridgeSubscription (String clusterDomain) throws PersistenceManagerException; - public abstract Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId, String cartridgeType) + public abstract List<CartridgeSubscription> getCartridgeSubscriptions(int tenantId, String cartridgeType) throws PersistenceManagerException; /*public abstract Repository getRepository (int tenantId, String alias) http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fd6b573e/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java index 2cd1cd5..0db3c19 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/persistence/RegistryBasedPersistenceManager.java @@ -21,58 +21,81 @@ package org.apache.stratos.adc.mgt.persistence; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.adc.mgt.exception.ADCException; import org.apache.stratos.adc.mgt.exception.PersistenceManagerException; import org.apache.stratos.adc.mgt.lookup.ClusterIdToSubscription; import org.apache.stratos.adc.mgt.lookup.SubscriptionContext; import org.apache.stratos.adc.mgt.registry.RegistryManager; import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription; import org.apache.stratos.adc.mgt.utils.Deserializer; +import org.apache.stratos.adc.mgt.utils.Serializer; import org.wso2.carbon.registry.core.exceptions.RegistryException; -import java.util.Collection; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; public class RegistryBasedPersistenceManager extends PersistenceManager { private static final Log log = LogFactory.getLog(RegistryBasedPersistenceManager.class); + // Registry paths + private static final String STRATOS_MANAGER_REOSURCE = "/stratos.manager"; + private static final String CLUSTER_ID_TO_SUBSCRIPTION = "/clusterIdToSubscription"; + private static final String TENANT_ID_TO_SUBSCRIPTION_CONTEXT = "/tenantIdToSubscriptionContext"; @Override - public void persistCartridgeSubscription(CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException { + public void persistCartridgeSubscription (CartridgeSubscription cartridgeSubscription) throws PersistenceManagerException { SubscriptionContext subscriptionContext = new SubscriptionContext(); subscriptionContext.addSubscription(cartridgeSubscription); + //TODO: need to synchronize? + // persist in the path TENANT_ID_TO_SUBSCRIPTION_CONTEXT try { - RegistryManager.getInstance().persistSubscriptionContext(cartridgeSubscription.getSubscriber().getTenantId(), subscriptionContext); + RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" + + Integer.toString(cartridgeSubscription.getSubscriber().getTenantId()), Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext)); } catch (RegistryException e) { throw new PersistenceManagerException(e); - } catch (ADCException e) { + } catch (IOException e) { + throw new PersistenceManagerException(e); + } + + // persist in the path CLUSTER_ID_TO_SUBSCRIPTION + try { + RegistryManager.getInstance().persist(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION + "/" + + cartridgeSubscription.getClusterDomain(), Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext)); + + } catch (RegistryException e) { + throw new PersistenceManagerException(e); + + } catch (IOException e) { throw new PersistenceManagerException(e); } } @Override - public void removeCartridgeSubscription(int tenantId, String alias) throws PersistenceManagerException { + public void removeCartridgeSubscription (int tenantId, String alias) throws PersistenceManagerException { //TODO } @Override - public CartridgeSubscription getCartridgeSubscription(int tenantId, String alias) throws PersistenceManagerException { + public CartridgeSubscription getCartridgeSubscription (int tenantId, String alias) throws PersistenceManagerException { Object byteObj; try { - byteObj = RegistryManager.getInstance().getSubscriptionContext(tenantId); - - } catch (ADCException e) { - throw new PersistenceManagerException(e); + byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" + + Integer.toString(tenantId)); } catch (RegistryException e) { throw new PersistenceManagerException(e); } + if (byteObj == null) { + return null; + } + Object subscriptionContextObj; try { @@ -92,15 +115,65 @@ public class RegistryBasedPersistenceManager extends PersistenceManager { } @Override - public Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId) throws PersistenceManagerException { + public List<CartridgeSubscription> getCartridgeSubscriptions () throws PersistenceManagerException { - Object byteObj; + Object resourceObj; try { - byteObj = RegistryManager.getInstance().getSubscriptionContext(tenantId); + resourceObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT); - } catch (ADCException e) { + } catch (RegistryException e) { throw new PersistenceManagerException(e); + } + + if ((resourceObj == null) || !(resourceObj instanceof String[])) { + return null; + } + + // get the paths for all SubscriptionContext instnaces + String[] subscriptionCtxtResourcePaths = (String[]) resourceObj; + + List<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>(); + //for each path, get the SubscriptionContext instance + for (String subscriptionCtxResourcePath : subscriptionCtxtResourcePaths) { + + Object serializedSubscriptionCtxObj = null; + try { + serializedSubscriptionCtxObj = RegistryManager.getInstance().retrieve(subscriptionCtxResourcePath); + + } catch (RegistryException e) { + // issue might be at only this path, therefore log and continue + log.error("Error while retrieving Resource at " + subscriptionCtxResourcePath, e); + continue; + } + + //De-serialize + Object subscriptionCtxObj = null; + try { + subscriptionCtxObj = Deserializer.deserializeFromByteArray((byte[]) serializedSubscriptionCtxObj); + + } catch (Exception e) { + // issue might be de-serializing only this object, therefore log and continue + log.error("Error while de-serializing the object retrieved from " + subscriptionCtxResourcePath, e); + continue; + } + + if (subscriptionCtxObj != null && subscriptionCtxObj instanceof SubscriptionContext) { + SubscriptionContext subscriptionContext = (SubscriptionContext) subscriptionCtxObj; + cartridgeSubscriptions.addAll(subscriptionContext.getSubscriptions()); + } + } + + return cartridgeSubscriptions; + } + + @Override + public List<CartridgeSubscription> getCartridgeSubscriptions (int tenantId) throws PersistenceManagerException { + + Object byteObj; + + try { + byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId)); } catch (RegistryException e) { throw new PersistenceManagerException(e); @@ -115,25 +188,22 @@ public class RegistryBasedPersistenceManager extends PersistenceManager { throw new PersistenceManagerException(e); } - SubscriptionContext subscriptionContext; + List<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>(); if (subscriptionContextObj instanceof SubscriptionContext) { - subscriptionContext = (SubscriptionContext) subscriptionContextObj; - return subscriptionContext.getSubscriptions(); + //get all Subscriptions for this tenant + cartridgeSubscriptions.addAll(((SubscriptionContext) subscriptionContextObj).getSubscriptions()); } - return null; + return cartridgeSubscriptions; } @Override - public CartridgeSubscription getCartridgeSubscription(String clusterDomain) throws PersistenceManagerException { + public CartridgeSubscription getCartridgeSubscription (String clusterDomain) throws PersistenceManagerException { Object byteObj; try { - byteObj = RegistryManager.getInstance().getClusterIdToSubscription(); - - } catch (ADCException e) { - throw new PersistenceManagerException(e); + byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + CLUSTER_ID_TO_SUBSCRIPTION + "/" + clusterDomain); } catch (RegistryException e) { throw new PersistenceManagerException(e); @@ -148,25 +218,20 @@ public class RegistryBasedPersistenceManager extends PersistenceManager { throw new PersistenceManagerException(e); } - ClusterIdToSubscription clusterIdToSubscription; if (clusterIdToSubscriptionObj instanceof ClusterIdToSubscription) { - clusterIdToSubscription = (ClusterIdToSubscription) clusterIdToSubscriptionObj; - return clusterIdToSubscription.getSubscription(clusterDomain); + ((ClusterIdToSubscription) clusterIdToSubscriptionObj).getSubscription(clusterDomain); } return null; } @Override - public Collection<CartridgeSubscription> getCartridgeSubscriptions(int tenantId, String cartridgeType) throws PersistenceManagerException { + public List<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String cartridgeType) throws PersistenceManagerException { Object byteObj; try { - byteObj = RegistryManager.getInstance().getSubscriptionContext(tenantId); - - } catch (ADCException e) { - throw new PersistenceManagerException(e); + byteObj = RegistryManager.getInstance().retrieve(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId)); } catch (RegistryException e) { throw new PersistenceManagerException(e); @@ -181,12 +246,12 @@ public class RegistryBasedPersistenceManager extends PersistenceManager { throw new PersistenceManagerException(e); } - SubscriptionContext subscriptionContext; + List<CartridgeSubscription> cartridgeSubscriptions = new ArrayList<CartridgeSubscription>(); if (subscriptionContextObj instanceof SubscriptionContext) { - subscriptionContext = (SubscriptionContext) subscriptionContextObj; - return subscriptionContext.getSubscriptionsOfType(cartridgeType); + //get all Subscriptions for this tenant and the type + cartridgeSubscriptions.addAll(((SubscriptionContext) subscriptionContextObj).getSubscriptionsOfType(cartridgeType)); } - return null; + return cartridgeSubscriptions; } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fd6b573e/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java index 707d772..821418a 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/registry/RegistryManager.java @@ -21,11 +21,7 @@ package org.apache.stratos.adc.mgt.registry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.adc.mgt.exception.ADCException; import org.apache.stratos.adc.mgt.internal.DataHolder; -import org.apache.stratos.adc.mgt.lookup.ClusterIdToSubscription; -import org.apache.stratos.adc.mgt.lookup.SubscriptionContext; -import org.apache.stratos.adc.mgt.utils.Serializer; import org.wso2.carbon.registry.core.Resource; import org.wso2.carbon.registry.core.exceptions.RegistryException; import org.wso2.carbon.registry.core.exceptions.ResourceNotFoundException; @@ -36,20 +32,16 @@ public class RegistryManager { private final static Log log = LogFactory.getLog(RegistryManager.class); ; - private final static String STRATOS_MANAGER_REOSURCE = "/stratos.manager"; - private final static String CLUSTER_ID_TO_SUBSCRIPTION = "/clusterIdToSubscription"; - private final static String TENENTID_TO_SUBSCRIPTION_CONTEXT = "/tenantIdToSubscriptionContext"; + private static final String STRATOS_MANAGER_REOSURCE = "/stratos.manager"; private static RegistryService registryService; private static volatile RegistryManager registryManager; public static RegistryManager getInstance() { - registryService = DataHolder.getRegistryService(); - if (registryManager == null) { synchronized (RegistryManager.class) { - if (registryService == null) { + if (registryManager == null) { return registryManager; } registryManager = new RegistryManager(); @@ -59,60 +51,64 @@ public class RegistryManager { } private RegistryManager() { - + registryService = DataHolder.getRegistryService(); } - private UserRegistry initRegistry (int tenantId) throws RegistryException, ADCException { + private UserRegistry initRegistry (int tenantId) throws RegistryException { UserRegistry tenantGovRegistry = registryService.getGovernanceSystemRegistry(tenantId); - if (tenantGovRegistry == null) { + /*if (tenantGovRegistry == null) { String errorMsg = "Tenant " + tenantId + "'s governance registry is not initialized"; log.error(errorMsg); throw new ADCException(errorMsg); - } + }*/ - synchronized (RegistryManager.class) { - // check if the resource is available, else create it - try { - if (!tenantGovRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) { - tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE, tenantGovRegistry.newCollection()); + // check if the resource is available, else create it + if (!tenantGovRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) { + synchronized (RegistryManager.class) { + try { + if (!tenantGovRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) { + tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE, tenantGovRegistry.newCollection()); + } + } catch (RegistryException e) { + String errorMsg = "Failed to create the registry resource " + STRATOS_MANAGER_REOSURCE; + log.error(errorMsg, e); + throw e; } - } catch (RegistryException e) { - String errorMsg = "Failed to create the registry resource " + STRATOS_MANAGER_REOSURCE; - log.error(errorMsg, e); - throw new ADCException(errorMsg, e); } } return tenantGovRegistry; } - private UserRegistry initRegistry () throws RegistryException, ADCException { + private UserRegistry initRegistry () throws RegistryException { UserRegistry govRegistry = registryService.getGovernanceSystemRegistry(); - if (govRegistry == null) { + /*if (govRegistry == null) { String errorMsg = "Governance registry is not initialized"; log.error(errorMsg); throw new ADCException(errorMsg); - } + }*/ - synchronized (RegistryManager.class) { - // check if the resource is available, else create it - try { - if (!govRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) { - govRegistry.put(STRATOS_MANAGER_REOSURCE, govRegistry.newCollection()); + // check if the resource is available, else create it + if (!govRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) { + synchronized (RegistryManager.class) { + try { + if (!govRegistry.resourceExists(STRATOS_MANAGER_REOSURCE)) { + govRegistry.put(STRATOS_MANAGER_REOSURCE, govRegistry.newCollection()); + } + } catch (RegistryException e) { + String errorMsg = "Failed to create the registry resource " + STRATOS_MANAGER_REOSURCE; + log.error(errorMsg, e); + throw e; } - } catch (RegistryException e) { - String errorMsg = "Failed to create the registry resource " + STRATOS_MANAGER_REOSURCE; - log.error(errorMsg, e); - throw new ADCException(errorMsg, e); } } return govRegistry; } - public void persistSubscriptionContext(int tenantId, SubscriptionContext subscriptionContext) + /*public void persistSubscriptionContext (int tenantId, SubscriptionContext subscriptionContext) throws RegistryException, ADCException { //TODO: uncomment @@ -124,10 +120,7 @@ public class RegistryManager { tenantGovRegistry.beginTransaction(); Resource nodeResource = tenantGovRegistry.newResource(); nodeResource.setContent(Serializer.serializeSubscriptionSontextToByteArray(subscriptionContext)); - //TODO: uncomment - //tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE + TENENTID_TO_SUBSCRIPTION_CONTEXT, nodeResource); - //temporary - tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE + TENENTID_TO_SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId), nodeResource); + tenantGovRegistry.put(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT, nodeResource); tenantGovRegistry.commitTransaction(); } catch (Exception e) { @@ -138,11 +131,12 @@ public class RegistryManager { } } + //TODO: retun the de-serialized object public Object getSubscriptionContext(int tenantId) throws ADCException, RegistryException { //TODO: uncomment //UserRegistry tenantGovRegistry = registryService.getGovernanceSystemRegistry(tenantId); - //temprary + //temporary UserRegistry tenantGovRegistry = registryService.getGovernanceSystemRegistry(); if (tenantGovRegistry == null) { @@ -152,14 +146,11 @@ public class RegistryManager { } try { - //TODO: uncomment - //Resource resource = tenantGovRegistry.get(STRATOS_MANAGER_REOSURCE + TENENTID_TO_SUBSCRIPTION_CONTEXT); - //temporary - Resource resource = tenantGovRegistry.get(STRATOS_MANAGER_REOSURCE + TENENTID_TO_SUBSCRIPTION_CONTEXT + "/" + Integer.toString(tenantId)); + Resource resource = tenantGovRegistry.get(STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT); return resource.getContent(); } catch (ResourceNotFoundException ignore) { - log.error("Sepcified resource not found at " + STRATOS_MANAGER_REOSURCE + TENENTID_TO_SUBSCRIPTION_CONTEXT); + log.error("Sepcified resource not found at " + STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT); return null; } catch (RegistryException e) { @@ -167,9 +158,33 @@ public class RegistryManager { log.error(errorMsg, e); throw new ADCException(errorMsg, e); } - } + }*/ + + /*public Object getSubscriptionContexts() throws RegistryException { - public void persistClusterIdToSubscription (ClusterIdToSubscription clusterIdToSubscription) + UserRegistry registry = registryService.getGovernanceSystemRegistry(); + return retrieve(registry, STRATOS_MANAGER_REOSURCE + TENANT_ID_TO_SUBSCRIPTION_CONTEXT); + + + if ((resourceObj == null) || !(resourceObj instanceof String[])) { + return null; + } + + // get the paths for all SubscriptionContext instnaces + String[] subscriptionCtxtResourcePaths = (String[]) resourceObj; + + Collection<SubscriptionContext> cartridgeSubscriptionCtxts; + //for each path, get the SubscriptionContext instance + for (String subscriptionCtxResourcePath : subscriptionCtxtResourcePaths) { + Object subscriptionCtxObj = retrieve(registry, subscriptionCtxResourcePath); + if (subscriptionCtxObj != null && subscriptionCtxObj instanceof SubscriptionContext) { + + } + } + + }*/ + + /*public void persistClusterIdToSubscription (ClusterIdToSubscription clusterIdToSubscription) throws RegistryException, ADCException { UserRegistry govRegistry = initRegistry(); @@ -189,6 +204,7 @@ public class RegistryManager { } } + //TODO: retun the de-serialized object public Object getClusterIdToSubscription () throws ADCException, RegistryException { UserRegistry govRegistry = registryService.getGovernanceSystemRegistry(); @@ -210,5 +226,51 @@ public class RegistryManager { log.error(errorMsg, e); throw new ADCException(errorMsg, e); } + }*/ + + public void persist (String path, byte [] resourceBytes) throws RegistryException { + + UserRegistry registry = initRegistry(); + + try { + registry.beginTransaction(); + Resource nodeResource = registry.newResource(); + nodeResource.setContent(resourceBytes); + registry.put(path, nodeResource); + registry.commitTransaction(); + + } catch (RegistryException e) { + registry.rollbackTransaction(); + String errorMsg = "Failed to persist the given resource in registry path " + path; + log.error(errorMsg, e); + throw e; + } + } + + public Object retrieve (String resourcePath) throws RegistryException { + + UserRegistry registry = initRegistry(); + + Resource resource; + + try { + resource = registry.get(resourcePath); + + } catch (ResourceNotFoundException ignore) { + String errorMsg = "Resource not found at path " + resourcePath; + log.error(errorMsg); + return null; + + } catch (RegistryException e) { + String errorMsg = "Failed to retrieve the Resource at " + resourcePath + " from registry."; + log.error(errorMsg, e); + throw e; + } + + if(resource == null) { + return null; + } + + return resource.getContent(); } }
