This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 33063bb Make readModifyUpdate in MetadataCacheImpl thread-safe (#9900)
33063bb is described below
commit 33063bb69ab9d973e614e4ca8de6be626301c378
Author: feynmanlin <[email protected]>
AuthorDate: Mon Mar 22 11:26:51 2021 +0800
Make readModifyUpdate in MetadataCacheImpl thread-safe (#9900)
### Motivation
Now that the modification of Namespace Policies is not locked, multiple
threads may modify the same local Policies object at the same time, which may
cause thread safety issues.
The Policies object also contains non-thread-safe collections such as
HashMap and HashSet. Concurrent operations on these objects also have
thread-safety issues. E.g https://github.com/apache/pulsar/issues/9711
### Modifications
Use clone+CAS method to ensure thread safety
Visibility: The cache will be cloned, only the cloned object will be
modified, and the entire object will be written back eventually. The cached
object in caffeine will be immediately visible to other threads
Atomicity: Modification refers to the replacement of the entire cloned
object. There will be no intermediate state where some attribute values are
modified, so it is atomic
Orderliness: Guaranteed by the implementation class of MetadataCache. Now
the cached object corresponds to a version number. Use CAS to write back. If
the writeback fails, it will read -> clone -> modify -> update again to ensure
order.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 4 +-
.../metadata/cache/impl/MetadataCacheImpl.java | 13 +++-
.../apache/pulsar/metadata/MetadataCacheTest.java | 83 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d212fb5..358db35 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2549,8 +2549,8 @@ public abstract class NamespacesBase extends
AdminResource {
private void updatePolicies(String path, Function<Policies, Policies>
updateFunction) {
try {
// Force to read the data s.t. the watch to the cache content is
setup.
- namespaceResources().set(path(POLICIES, namespaceName.toString()),
updateFunction);
- log.info("[{}] Successfully updated the on namespace {}",
clientAppId(), path, namespaceName);
+ namespaceResources().set(path, updateFunction);
+ log.info("[{}] Successfully updated the {} on namespace {}",
clientAppId(), path, namespaceName);
} catch (NotFoundException e) {
log.warn("[{}] Namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 23f8e7b..c9a4284 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -24,6 +24,7 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.List;
import java.util.Map;
@@ -46,7 +47,6 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializat
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.Stat;
-import org.checkerframework.checker.nullness.Opt;
public class MetadataCacheImpl<T> implements MetadataCache<T>,
Consumer<Notification> {
@@ -127,7 +127,14 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
long expectedVersion;
if (optEntry.isPresent()) {
- currentValue = Optional.of(optEntry.get().getKey());
+ T clone;
+ try {
+ // Use clone and CAS zk to ensure thread safety
+ clone =
serde.deserialize(serde.serialize(optEntry.get().getKey()));
+ } catch (IOException e) {
+ return FutureUtils.exception(e);
+ }
+ currentValue = Optional.of(clone);
expectedVersion =
optEntry.get().getValue().getVersion();
} else {
currentValue = Optional.empty();
@@ -166,6 +173,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
T newValueObj;
byte[] newValue;
try {
+ // Use clone and CAS zk to ensure thread safety
+ currentValue =
serde.deserialize(serde.serialize(currentValue));
newValueObj = modifyFunction.apply(currentValue);
newValue = serde.serialize(newValueObj);
} catch (Throwable t) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index e649b7a..5c68cfb 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -27,10 +30,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -190,6 +196,83 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
}
@Test(dataProvider = "impl")
+ public void testReadCloned(String provider, String url) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<Policies> objCache =
store.getMetadataCache(Policies.class);
+ String path = "/testReadCloned-policies";
+ // init cache
+ Policies policies = new Policies();
+ policies.max_unacked_messages_per_consumer = 100;
+ policies.replication_clusters.add("1");
+ objCache.create(path, policies).get();
+
+ Policies tempPolicies = objCache.get(path).get().get();
+ assertSame(tempPolicies, objCache.get(path).get().get());
+ AtomicReference<Policies> reference = new AtomicReference<>(new
Policies());
+ AtomicReference<Policies> reference2 = new AtomicReference<>(new
Policies());
+
+ objCache.readModifyUpdate(path, (policies1) -> {
+ assertNotSame(policies1, tempPolicies);
+ reference.set(policies1);
+ policies1.max_unacked_messages_per_consumer = 200;
+ return policies1;
+ }).get();
+ objCache.readModifyUpdate(path, (policies1) -> {
+ assertNotSame(policies1, tempPolicies);
+ reference2.set(policies1);
+ policies1.max_unacked_messages_per_consumer = 300;
+ return policies1;
+ }).get();
+ //The original object should not be modified
+
assertEquals(tempPolicies.max_unacked_messages_per_consumer.intValue(), 100);
+ assertNotSame(reference.get(), reference2.get());
+ assertNotEquals(reference.get().max_unacked_messages_per_consumer
+ , reference2.get().max_unacked_messages_per_consumer);
+
+ }
+
+ @Test(dataProvider = "impl")
+ public void testCloneInReadModifyUpdateOrCreate(String provider, String
url) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<Policies> objCache =
store.getMetadataCache(Policies.class);
+ String path = "/testCloneInReadModifyUpdateOrCreate-policies";
+ // init cache
+ Policies policies = new Policies();
+ policies.max_unacked_messages_per_consumer = 100;
+ objCache.create(path, policies).get();
+
+ Policies tempPolicies = objCache.get(path).get().get();
+ assertSame(tempPolicies, objCache.get(path).get().get());
+ AtomicReference<Policies> reference = new AtomicReference<>(new
Policies());
+ AtomicReference<Policies> reference2 = new AtomicReference<>(new
Policies());
+
+ objCache.readModifyUpdateOrCreate(path, (policies1) -> {
+ Policies policiesRef = policies1.get();
+ assertNotSame(policiesRef, tempPolicies);
+ reference.set(policiesRef);
+ policiesRef.max_unacked_messages_per_consumer = 200;
+ return policiesRef;
+ }).get();
+ objCache.readModifyUpdateOrCreate(path, (policies1) -> {
+ Policies policiesRef = policies1.get();
+ assertNotSame(policiesRef, tempPolicies);
+ reference2.set(policiesRef);
+ policiesRef.max_unacked_messages_per_consumer = 300;
+ return policiesRef;
+ }).get();
+ //The original object should not be modified
+
assertEquals(tempPolicies.max_unacked_messages_per_consumer.intValue(), 100);
+ assertNotSame(reference.get(), reference2.get());
+ assertNotEquals(reference.get().max_unacked_messages_per_consumer
+ , reference2.get().max_unacked_messages_per_consumer);
+
+ }
+
+ @Test(dataProvider = "impl")
public void readModifyUpdate(String provider, String url) throws Exception
{
@Cleanup
MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());