Fix STRATOS-1585: Threads can override resource paths in metadata api registry
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4b8d439b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4b8d439b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4b8d439b Branch: refs/heads/stratos-4.1.x Commit: 4b8d439b34097715f9cddfab4948f4c14e472653 Parents: ade33aa Author: Akila Perera <[email protected]> Authored: Wed Oct 14 16:23:23 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Wed Oct 14 16:43:04 2015 +0530 ---------------------------------------------------------------------- .../service/MetadataTopologyEventReceiver.java | 95 ++++++++ .../metadata/service/api/MetadataApi.java | 56 +++-- .../service/registry/MetadataApiRegistry.java | 223 +++++++++++-------- 3 files changed, 255 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/4b8d439b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java new file mode 100644 index 0000000..f483995 --- /dev/null +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.metadata.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.concurrent.locks.ReadWriteLock; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.application.ApplicationCreatedEvent; +import org.apache.stratos.messaging.event.application.ApplicationDeletedEvent; +import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent; +import org.apache.stratos.messaging.event.topology.ApplicationClustersRemovedEvent; +import org.apache.stratos.messaging.listener.application.ApplicationCreatedEventListener; +import org.apache.stratos.messaging.listener.application.ApplicationDeletedEventListener; +import org.apache.stratos.messaging.listener.topology.ApplicationClustersCreatedEventListener; +import org.apache.stratos.messaging.listener.topology.ApplicationClustersRemovedEventListener; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.metadata.service.registry.MetadataApiRegistry; + +import java.util.concurrent.ExecutorService; + +/** + * Topology receiver class for metadata service + */ +public class MetadataTopologyEventReceiver { + private static final Log log = LogFactory.getLog(MetadataTopologyEventReceiver.class); + + private TopologyEventReceiver topologyEventReceiver; + private ExecutorService executorService; + public static final String METADATA_SERVICE_THREAD_POOL_ID = "metadata.service.thread.pool."; + + public MetadataTopologyEventReceiver() { + this.topologyEventReceiver = new TopologyEventReceiver(); + executorService = StratosThreadPool.getExecutorService(METADATA_SERVICE_THREAD_POOL_ID, 10); + addEventListeners(); + } + + private void addEventListeners() { + topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() { + @Override + protected void onEvent(Event event) { + ApplicationClustersCreatedEvent appClustersCreatedEvent = (ApplicationClustersCreatedEvent) event; + String applicationId = appClustersCreatedEvent.getAppId(); + MetadataApiRegistry.getApplicationIdToReadWriteLockMap() + .put(applicationId, new ReadWriteLock(METADATA_SERVICE_THREAD_POOL_ID.concat(applicationId))); + } + }); + + topologyEventReceiver.addEventListener(new ApplicationClustersRemovedEventListener() { + @Override + protected void onEvent(Event event) { + ApplicationClustersRemovedEvent appClustersRemovedEvent = (ApplicationClustersRemovedEvent) event; + String applicationId = appClustersRemovedEvent.getAppId(); + MetadataApiRegistry.getApplicationIdToReadWriteLockMap().remove(applicationId); + } + }); + } + + public void execute() { + topologyEventReceiver.setExecutorService(getExecutorService()); + topologyEventReceiver.execute(); + + if (log.isInfoEnabled()) { + log.info("Metadata service topology receiver started."); + } + } + + public void terminate() { + topologyEventReceiver.terminate(); + if (log.isInfoEnabled()) { + log.info("Metadata service topology receiver stopped."); + } + } + + public ExecutorService getExecutorService() { + return executorService; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/4b8d439b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java index 712bcaa..53174e0 100644 --- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java @@ -44,7 +44,13 @@ public class MetadataApi { * Meta data admin configuration loading */ public MetadataApi() { - registry = new MetadataApiRegistry(); + try { + registry = new MetadataApiRegistry(); + } catch (Exception e) { + String msg = "Could not initialize Metadata API"; + log.error(msg, e); + throw new RuntimeException(msg); + } } @GET @@ -57,8 +63,7 @@ public class MetadataApi { List<Property> properties; Property[] propertiesArr = null; try { - properties = registry - .getApplicationProperties(applicationId); + properties = registry.getApplicationProperties(applicationId); if (properties != null) { propertiesArr = new Property[properties.size()]; propertiesArr = properties.toArray(propertiesArr); @@ -83,13 +88,12 @@ public class MetadataApi { @Produces("application/json") @Consumes("application/json") public Response getClusterProperties(@PathParam("application_id") String applicationId, - @PathParam("cluster_id") String clusterId) throws RestAPIException { + @PathParam("cluster_id") String clusterId) throws RestAPIException { List<Property> properties; Property[] propertiesArr = null; try { - properties = registry - .getClusterProperties(applicationId, clusterId); + properties = registry.getClusterProperties(applicationId, clusterId); if (properties != null) { propertiesArr = new Property[properties.size()]; propertiesArr = properties.toArray(propertiesArr); @@ -114,11 +118,9 @@ public class MetadataApi { @Produces("application/json") @Consumes("application/json") public Response getApplicationProperty(@PathParam("application_id") String applicationId, - @PathParam("property_name") String propertyName) - throws RestAPIException { + @PathParam("property_name") String propertyName) throws RestAPIException { List<Property> properties; - Property property = null; try { properties = registry.getApplicationProperties(applicationId); @@ -151,8 +153,8 @@ public class MetadataApi { @Produces("application/json") @Consumes("application/json") public Response getClusterProperty(@PathParam("application_id") String applicationId, - @PathParam("cluster_id") String clusterId, - @PathParam("property_name") String propertyName) throws RestAPIException { + @PathParam("cluster_id") String clusterId, @PathParam("property_name") String propertyName) + throws RestAPIException { List<Property> properties; Property property = null; @@ -186,8 +188,7 @@ public class MetadataApi { @Path("applications/{application_id}/properties") @Produces("application/json") @Consumes("application/json") - public Response addPropertyToApplication(@PathParam("application_id") String applicationId, - Property property) + public Response addPropertyToApplication(@PathParam("application_id") String applicationId, Property property) throws RestAPIException { URI url = uriInfo.getAbsolutePathBuilder().path(applicationId).build(); @@ -206,8 +207,7 @@ public class MetadataApi { @Produces("application/json") @Consumes("application/json") public Response addPropertyToCluster(@PathParam("application_id") String applicationId, - @PathParam("cluster_id") String clusterId, Property property) - throws RestAPIException { + @PathParam("cluster_id") String clusterId, Property property) throws RestAPIException { URI url = uriInfo.getAbsolutePathBuilder().path(applicationId + "/" + clusterId).build(); try { @@ -230,9 +230,7 @@ public class MetadataApi { try { boolean deleted = registry.deleteApplicationProperties(applicationId); if (!deleted) { - log.warn(String.format( - "No metadata is associated with given appId %s", - applicationId)); + log.warn(String.format("No metadata is associated with given appId %s", applicationId)); } } catch (RegistryException e) { String msg = "Resource attached with appId could not be deleted [application-id] " + applicationId; @@ -247,18 +245,16 @@ public class MetadataApi { @Produces("application/json") @Consumes("application/json") public Response deleteApplicationProperty(@PathParam("application_id") String applicationId, - @PathParam("property_name") String propertyName) - throws RestAPIException { + @PathParam("property_name") String propertyName) throws RestAPIException { try { boolean deleted = registry.removePropertyFromApplication(applicationId, propertyName); if (!deleted) { - log.warn(String.format( - "No metadata is associated with given appId %s", - applicationId)); + log.warn(String.format("No metadata is associated with given appId %s", applicationId)); } } catch (RegistryException e) { - String msg = String.format("[application-id] %s [property-name] deletion failed ", applicationId, propertyName); + String msg = String + .format("[application-id] %s [property-name] %s deletion failed ", applicationId, propertyName); log.error(msg, e); throw new RestAPIException(msg, e); } @@ -270,20 +266,18 @@ public class MetadataApi { @Produces("application/json") @Consumes("application/json") public Response deleteApplicationPropertyValue(@PathParam("application_id") String applicationId, - @PathParam("property_name") String propertyName, - @PathParam("value") String propertyValue) + @PathParam("property_name") String propertyName, @PathParam("value") String propertyValue) throws RestAPIException { try { boolean deleted = registry.removePropertyValueFromApplication(applicationId, propertyName, propertyValue); if (!deleted) { - log.warn(String.format( - "No metadata is associated with given [application-id] %s", - applicationId)); + log.warn(String.format("No metadata is associated with given [application-id] %s", applicationId)); } } catch (RegistryException e) { - String msg = String.format("[application-id] %s [property-name] %s [value] %s deletion failed" + - applicationId, propertyName, propertyValue); + String msg = String + .format("[application-id] %s [property-name] %s [value] %s deletion failed" + applicationId, + propertyName, propertyValue); log.error(msg, e); throw new RestAPIException(msg, e); } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b8d439b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java index abd3f4b..f6b1ee4 100644 --- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java @@ -21,6 +21,8 @@ package org.apache.stratos.metadata.service.registry; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.concurrent.locks.ReadWriteLock; +import org.apache.stratos.metadata.service.MetadataTopologyEventReceiver; import org.apache.stratos.metadata.service.definition.Property; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.registry.core.Registry; @@ -33,7 +35,6 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Context; import java.util.*; - /** * Carbon registry implementation */ @@ -44,8 +45,12 @@ public class MetadataApiRegistry implements DataStore { private static Log log = LogFactory.getLog(MetadataApiRegistry.class); @Context HttpServletRequest httpServletRequest; + private static final Map<String, ReadWriteLock> applicationIdToReadWriteLockMap = new HashMap<>(); + private MetadataTopologyEventReceiver metadataTopologyEventReceiver; public MetadataApiRegistry() { + metadataTopologyEventReceiver = new MetadataTopologyEventReceiver(); + metadataTopologyEventReceiver.execute(); } public List<Property> getApplicationProperties(String applicationName) throws RegistryException { @@ -90,7 +95,6 @@ public class MetadataApiRegistry implements DataStore { Registry tempRegistry = getRegistry(); String resourcePath = mainResource + applicationName + "/" + clusterId; - if (!tempRegistry.resourceExists(resourcePath)) { return null; } @@ -123,7 +127,7 @@ public class MetadataApiRegistry implements DataStore { public void addPropertyToApplication(String applicationId, Property property) throws RegistryException { Registry registry = getRegistry(); String resourcePath = mainResource + applicationId; - + acquireWriteLock(applicationId); try { // We are using only super tenant registry to persist PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); @@ -135,7 +139,7 @@ public class MetadataApiRegistry implements DataStore { } else { nodeResource = registry.newCollection(); if (log.isDebugEnabled()) { - log.debug("Registry resource is create at path for application: " + nodeResource.getPath()); + log.debug("Registry resource created for application: " + applicationId); } } @@ -143,34 +147,39 @@ public class MetadataApiRegistry implements DataStore { for (String value : property.getValues()) { if (!propertyValueExist(nodeResource, property.getKey(), value)) { updated = true; - log.info(String.format("Registry property is added: [resource-path] %s " + - "[Property Name] %s [Property Value] %s", - resourcePath, property.getKey(), value)); + if (log.isDebugEnabled()) { + log.debug(String.format("Registry property is added: [resource-path] %s " + + "[Property Name] %s [Property Value] %s", resourcePath, property.getKey(), + value)); + } nodeResource.addProperty(property.getKey(), value); } else { - log.info(String.format("Property value already exist property=%s value=%s", property.getKey(), value)); + if (log.isDebugEnabled()) { + log.debug(String.format("Property value already exist property=%s value=%s", property.getKey(), + value)); + } } } - if (updated) { registry.put(resourcePath, nodeResource); + if (log.isDebugEnabled()) { + log.debug(String.format( + "Registry property is persisted: [resource-path] %s [Property Name] %s [Property Values] " + + "%s", resourcePath, property.getKey(), Arrays.asList(property.getValues()))); + } } - //registry.commitTransaction(); } catch (Exception e) { String msg = "Failed to persist properties in registry: " + resourcePath; - //registry.rollbackTransaction(); log.error(msg, e); throw new RegistryException(msg, e); + } finally { + releaseWriteLock(applicationId); } } private boolean propertyValueExist(Resource nodeResource, String key, String value) { List<String> properties = nodeResource.getPropertyValues(key); - if (properties == null) { - return false; - } else { - return properties.contains(value); - } + return properties != null && properties.contains(value); } @@ -178,14 +187,12 @@ public class MetadataApiRegistry implements DataStore { throws RegistryException { Registry registry = getRegistry(); String resourcePath = mainResource + applicationId; - - // We are using only super tenant registry to persist - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); - ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - + acquireWriteLock(applicationId); try { - registry.beginTransaction(); + // We are using only super tenant registry to persist + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); + ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); Resource nodeResource; if (registry.resourceExists(resourcePath)) { nodeResource = registry.get(resourcePath); @@ -195,20 +202,13 @@ public class MetadataApiRegistry implements DataStore { } nodeResource.removePropertyValue(propertyName, valueToRemove); registry.put(resourcePath, nodeResource); - registry.commitTransaction(); - - log.info(String.format("Application %s property %s value %s is removed from metadata ", - applicationId, propertyName, valueToRemove)); + log.info(String.format("Application %s property %s value %s is removed from metadata ", applicationId, + propertyName, valueToRemove)); return true; - }catch (Exception e){ - try { - registry.rollbackTransaction(); - } catch (RegistryException e1) { - if (log.isErrorEnabled()) { - log.error("Could not rollback transaction", e1); - } - } + } catch (Exception e) { throw new RegistryException("Could not remove registry resource: [resource-path] " + resourcePath, e); + } finally { + releaseWriteLock(applicationId); } } @@ -220,30 +220,33 @@ public class MetadataApiRegistry implements DataStore { * @param property * @throws RegistryException */ - public void addPropertyToCluster(String applicationId, String clusterId, Property property) throws RegistryException { + public void addPropertyToCluster(String applicationId, String clusterId, Property property) + throws RegistryException { Registry registry = getRegistry(); String resourcePath = mainResource + applicationId + "/" + clusterId; - - // We are using only super tenant registry to persist - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); - ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - - Resource nodeResource = null; - if (registry.resourceExists(resourcePath)) { - nodeResource = registry.get(resourcePath); - } else { - nodeResource = registry.newResource(); - if (log.isDebugEnabled()) { - log.debug("Registry resource is create at path for cluster" + nodeResource.getPath() + " for cluster"); + acquireWriteLock(applicationId); + try { + // We are using only super tenant registry to persist + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); + ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); + Resource nodeResource = null; + if (registry.resourceExists(resourcePath)) { + nodeResource = registry.get(resourcePath); + } else { + nodeResource = registry.newResource(); + if (log.isDebugEnabled()) { + log.debug("Registry resource created for cluster" + clusterId); + } } + nodeResource.setProperty(property.getKey(), Arrays.asList(property.getValues())); + registry.put(resourcePath, nodeResource); + log.info(String.format( + "Registry property is persisted: [resource-path] %s [Property Name] %s [Property Values] %s", + resourcePath, property.getKey(), Arrays.asList(property.getValues()))); + } finally { + releaseWriteLock(applicationId); } - - nodeResource.setProperty(property.getKey(), Arrays.asList(property.getValues())); - registry.put(resourcePath, nodeResource); - - log.info(String.format("Registry property is persisted: [resource-path] %s [Property Name] %s [Property Values] %s", - resourcePath, property.getKey(), Arrays.asList(property.getValues()))); } private UserRegistry getRegistry() throws RegistryException { @@ -262,31 +265,24 @@ public class MetadataApiRegistry implements DataStore { if (StringUtils.isBlank(applicationId)) { throw new IllegalArgumentException("Application ID can not be null"); } - - // We are using only super tenant registry to persist - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); - ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); Registry registry = getRegistry(); String resourcePath = mainResource + applicationId; + acquireWriteLock(applicationId); try { - registry.beginTransaction(); + // We are using only super tenant registry to persist + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); + ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); if (registry.resourceExists(resourcePath)) { registry.delete(resourcePath); log.info(String.format("Application [application-id ] properties removed from registry %s", applicationId)); } - registry.commitTransaction(); return true; - }catch (Exception e){ - try { - registry.rollbackTransaction(); - } catch (RegistryException e1) { - if (log.isErrorEnabled()) { - log.error("Could not rollback transaction", e1); - } - } + } catch (Exception e) { throw new RegistryException("Could not remove registry resource: [resource-path] " + resourcePath, e); + } finally { + releaseWriteLock(applicationId); } } @@ -294,30 +290,81 @@ public class MetadataApiRegistry implements DataStore { throws org.wso2.carbon.registry.api.RegistryException { Registry registry = getRegistry(); String resourcePath = mainResource + applicationId; - // We are using only super tenant registry to persist - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); - ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - + acquireWriteLock(applicationId); Resource nodeResource; - if (registry.resourceExists(resourcePath)) { - nodeResource = registry.get(resourcePath); - if (nodeResource.getProperty(propertyName) == null) { - log.info(String.format("[application-id] %s does not have a property [property-name] %s ", applicationId, - propertyName)); - return false; + try { + // We are using only super tenant registry to persist + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); + ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); + if (registry.resourceExists(resourcePath)) { + nodeResource = registry.get(resourcePath); + if (nodeResource.getProperty(propertyName) == null) { + log.info(String.format("[application-id] %s does not have a property [property-name] %s ", + applicationId, propertyName)); + return false; + } else { + nodeResource.removeProperty(propertyName); + registry.put(resourcePath, nodeResource); + } } else { - nodeResource.removeProperty(propertyName); - registry.put(resourcePath, nodeResource); + log.error("Registry resource not not found at " + resourcePath); + return false; } + + log.info(String.format("Application [application-id] %s property [property-name] %s removed from Registry ", + applicationId, propertyName)); + return true; + } finally { + releaseWriteLock(applicationId); + } + } + + public void acquireReadLock(String applicationId) { + if (applicationIdToReadWriteLockMap.get(applicationId) == null) { + throw new RuntimeException( + String.format("Invalid application [application-id] %s not found. Failed to acquire read lock.", + applicationId)); } else { - log.error("Registry resource not not found at " + resourcePath); - return false; + applicationIdToReadWriteLockMap.get(applicationId).acquireReadLock(); } + } - log.info(String.format("Application [application-id] %s property [property-name] %s removed from Registry ", - applicationId, propertyName)); - return true; + public void acquireWriteLock(String applicationId) { + if (applicationIdToReadWriteLockMap.get(applicationId) == null) { + throw new RuntimeException( + String.format("Invalid application [application-id] %s not found. Failed to acquire write lock.", + applicationId)); + } else { + applicationIdToReadWriteLockMap.get(applicationId).acquireWriteLock(); + } } + public void releaseReadLock(String applicationId) { + if (applicationIdToReadWriteLockMap.get(applicationId) == null) { + throw new RuntimeException( + String.format("Invalid application [application-id] %s not found. Failed to release read lock.", + applicationId)); + } else { + applicationIdToReadWriteLockMap.get(applicationId).releaseReadLock(); + } + } + + public void releaseWriteLock(String applicationId) { + if (applicationIdToReadWriteLockMap.get(applicationId) == null) { + throw new RuntimeException( + String.format("Invalid application [application-id] %s not found. Failed to release write lock.", + applicationId)); + } else { + applicationIdToReadWriteLockMap.get(applicationId).releaseWriteLock(); + } + } + + public static Map<String, ReadWriteLock> getApplicationIdToReadWriteLockMap() { + return applicationIdToReadWriteLockMap; + } + + public void stopTopologyReceiver() { + metadataTopologyEventReceiver.terminate(); + } }
