This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 33063bb  Make readModifyUpdate in MetadataCacheImpl thread-safe (#9900)
33063bb is described below

commit 33063bb69ab9d973e614e4ca8de6be626301c378
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 22 11:26:51 2021 +0800

    Make readModifyUpdate in MetadataCacheImpl thread-safe (#9900)
    
    ### Motivation
    Now that the modification of Namespace Policies is not locked, multiple 
threads may modify the same local Policies object at the same time, which may 
cause thread safety issues.
    The Policies object also contains non-thread-safe collections such as 
HashMap and HashSet. Concurrent operations on these objects also have 
thread-safety issues. E.g https://github.com/apache/pulsar/issues/9711
    
    ### Modifications
    Use clone+CAS method to ensure thread safety
    Visibility: The cache will be cloned, only the cloned object will be 
modified, and the entire object will be written back eventually. The cached 
object in caffeine will be immediately visible to other threads
    Atomicity: Modification refers to the replacement of the entire cloned 
object. There will be no intermediate state where some attribute values are 
modified, so it is atomic
    Orderliness: Guaranteed by the implementation class of MetadataCache. Now 
the cached object corresponds to a version number. Use CAS to write back. If 
the writeback fails, it will read -> clone -> modify -> update again to ensure 
order.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 +-
 .../metadata/cache/impl/MetadataCacheImpl.java     | 13 +++-
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 83 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d212fb5..358db35 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2549,8 +2549,8 @@ public abstract class NamespacesBase extends 
AdminResource {
    private void updatePolicies(String path, Function<Policies, Policies> 
updateFunction) {
        try {
            // Force to read the data s.t. the watch to the cache content is 
setup.
-           namespaceResources().set(path(POLICIES, namespaceName.toString()), 
updateFunction);
-           log.info("[{}] Successfully updated the on namespace {}", 
clientAppId(), path, namespaceName);
+           namespaceResources().set(path, updateFunction);
+           log.info("[{}] Successfully updated the {} on namespace {}", 
clientAppId(), path, namespaceName);
        } catch (NotFoundException e) {
            log.warn("[{}] Namespace {}: does not exist", clientAppId(), 
namespaceName);
            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 23f8e7b..c9a4284 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -24,6 +24,7 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +47,6 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializat
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.Stat;
-import org.checkerframework.checker.nullness.Opt;
 
 public class MetadataCacheImpl<T> implements MetadataCache<T>, 
Consumer<Notification> {
 
@@ -127,7 +127,14 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     long expectedVersion;
 
                     if (optEntry.isPresent()) {
-                        currentValue = Optional.of(optEntry.get().getKey());
+                        T clone;
+                        try {
+                            // Use clone and CAS zk to ensure thread safety
+                            clone = 
serde.deserialize(serde.serialize(optEntry.get().getKey()));
+                        } catch (IOException e) {
+                            return FutureUtils.exception(e);
+                        }
+                        currentValue = Optional.of(clone);
                         expectedVersion = 
optEntry.get().getValue().getVersion();
                     } else {
                         currentValue = Optional.empty();
@@ -166,6 +173,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     T newValueObj;
                     byte[] newValue;
                     try {
+                        // Use clone and CAS zk to ensure thread safety
+                        currentValue = 
serde.deserialize(serde.serialize(currentValue));
                         newValueObj = modifyFunction.apply(currentValue);
                         newValue = serde.serialize(newValueObj);
                     } catch (Throwable t) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index e649b7a..5c68cfb 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -27,10 +30,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+
 import lombok.AllArgsConstructor;
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -190,6 +196,83 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
     }
 
     @Test(dataProvider = "impl")
+    public void testReadCloned(String provider, String url) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<Policies> objCache = 
store.getMetadataCache(Policies.class);
+        String path = "/testReadCloned-policies";
+        // init cache
+        Policies policies = new Policies();
+        policies.max_unacked_messages_per_consumer = 100;
+        policies.replication_clusters.add("1");
+        objCache.create(path, policies).get();
+
+        Policies tempPolicies = objCache.get(path).get().get();
+        assertSame(tempPolicies, objCache.get(path).get().get());
+        AtomicReference<Policies> reference = new AtomicReference<>(new 
Policies());
+        AtomicReference<Policies> reference2 = new AtomicReference<>(new 
Policies());
+
+        objCache.readModifyUpdate(path, (policies1) -> {
+            assertNotSame(policies1, tempPolicies);
+            reference.set(policies1);
+            policies1.max_unacked_messages_per_consumer = 200;
+            return policies1;
+        }).get();
+        objCache.readModifyUpdate(path, (policies1) -> {
+            assertNotSame(policies1, tempPolicies);
+            reference2.set(policies1);
+            policies1.max_unacked_messages_per_consumer = 300;
+            return policies1;
+        }).get();
+        //The original object should not be modified
+        
assertEquals(tempPolicies.max_unacked_messages_per_consumer.intValue(), 100);
+        assertNotSame(reference.get(), reference2.get());
+        assertNotEquals(reference.get().max_unacked_messages_per_consumer
+                , reference2.get().max_unacked_messages_per_consumer);
+
+    }
+
+    @Test(dataProvider = "impl")
+    public void testCloneInReadModifyUpdateOrCreate(String provider, String 
url) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<Policies> objCache = 
store.getMetadataCache(Policies.class);
+        String path = "/testCloneInReadModifyUpdateOrCreate-policies";
+        // init cache
+        Policies policies = new Policies();
+        policies.max_unacked_messages_per_consumer = 100;
+        objCache.create(path, policies).get();
+
+        Policies tempPolicies = objCache.get(path).get().get();
+        assertSame(tempPolicies, objCache.get(path).get().get());
+        AtomicReference<Policies> reference = new AtomicReference<>(new 
Policies());
+        AtomicReference<Policies> reference2 = new AtomicReference<>(new 
Policies());
+
+        objCache.readModifyUpdateOrCreate(path, (policies1) -> {
+            Policies policiesRef = policies1.get();
+            assertNotSame(policiesRef, tempPolicies);
+            reference.set(policiesRef);
+            policiesRef.max_unacked_messages_per_consumer = 200;
+            return policiesRef;
+        }).get();
+        objCache.readModifyUpdateOrCreate(path, (policies1) -> {
+            Policies policiesRef = policies1.get();
+            assertNotSame(policiesRef, tempPolicies);
+            reference2.set(policiesRef);
+            policiesRef.max_unacked_messages_per_consumer = 300;
+            return policiesRef;
+        }).get();
+        //The original object should not be modified
+        
assertEquals(tempPolicies.max_unacked_messages_per_consumer.intValue(), 100);
+        assertNotSame(reference.get(), reference2.get());
+        assertNotEquals(reference.get().max_unacked_messages_per_consumer
+                , reference2.get().max_unacked_messages_per_consumer);
+
+    }
+
+    @Test(dataProvider = "impl")
     public void readModifyUpdate(String provider, String url) throws Exception 
{
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());

Reply via email to