This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new a9a80209 CURATOR-688. Fix SharedCount/SharedValue not update after 
Stat.version overflowed (#478)
a9a80209 is described below

commit a9a8020918c345fb6c3c0a099a27189cbd7cf819
Author: Hexiaoqiao <[email protected]>
AuthorDate: Tue Jun 4 10:23:19 2024 +0800

    CURATOR-688. Fix SharedCount/SharedValue not update after Stat.version 
overflowed (#478)
    
    Co-authored-by: Kezhu Wang <[email protected]>
    Co-authored-by: tison <[email protected]>
    Signed-off-by: tison <[email protected]>
---
 ...lue.java => IllegalTrySetVersionException.java} | 34 ++++++++------------
 .../framework/recipes/shared/SharedCount.java      |  7 ++--
 .../framework/recipes/shared/SharedValue.java      | 37 ++++++++++++----------
 .../framework/recipes/shared/VersionedValue.java   | 31 ++++++++++++++----
 .../framework/recipes/shared/TestSharedCount.java  | 13 ++++++--
 5 files changed, 74 insertions(+), 48 deletions(-)

diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
similarity index 53%
copy from 
curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
copy to 
curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
index 64d9780b..58e83399 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
@@ -19,29 +19,21 @@
 
 package org.apache.curator.framework.recipes.shared;
 
-import com.google.common.base.Preconditions;
+import org.apache.zookeeper.data.Stat;
 
 /**
- * POJO for a version and a value
+ * Exception to alert overflowed {@link Stat#getVersion()} {@code -1} which is 
not suitable in
+ * {@link SharedValue#trySetValue(VersionedValue, byte[])} and {@link 
SharedCount#trySetCount(VersionedValue, int)}.
+ *
+ * <p>In case of this exception, clients have to choose:
+ * <ul>
+ *     <li>Take their own risk to do a blind set.</li>
+ *     <li>Update ZooKeeper cluster to solve <a 
href="https://issues.apache.org/jira/browse/ZOOKEEPER-4743";>ZOOKEEPER-4743</a>.</li>
+ * </ul>
  */
-public class VersionedValue<T> {
-    private final int version;
-    private final T value;
-
-    /**
-     * @param version the version
-     * @param value the value (cannot be null)
-     */
-    VersionedValue(int version, T value) {
-        this.version = version;
-        this.value = Preconditions.checkNotNull(value, "value cannot be null");
-    }
-
-    public int getVersion() {
-        return version;
-    }
-
-    public T getValue() {
-        return value;
+public class IllegalTrySetVersionException extends IllegalArgumentException {
+    @Override
+    public String getMessage() {
+        return "overflowed Stat.version -1 is not suitable for trySet(a.k.a. 
compare-and-set ZooKeeper::setData)";
     }
 }
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index c401a3b0..2ecc528e 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executor;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * Manages a shared integer. All clients watching the same path will have the 
up-to-date
@@ -60,7 +61,7 @@ public class SharedCount implements Closeable, 
SharedCountReader, Listenable<Sha
     @Override
     public VersionedValue<Integer> getVersionedValue() {
         VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
-        return new VersionedValue<Integer>(localValue.getVersion(), 
fromBytes(localValue.getValue()));
+        return localValue.mapValue(SharedCount::fromBytes);
     }
 
     /**
@@ -102,11 +103,11 @@ public class SharedCount implements Closeable, 
SharedCountReader, Listenable<Sha
      * @param newCount the new value to attempt
      * @return true if the change attempt was successful, false if not. If the 
change
      * was not successful, {@link #getCount()} will return the updated value
+     * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} 
overflowed to {@code -1}
      * @throws Exception ZK errors, interruptions, etc.
      */
     public boolean trySetCount(VersionedValue<Integer> previous, int newCount) 
throws Exception {
-        VersionedValue<byte[]> previousCopy =
-                new VersionedValue<byte[]>(previous.getVersion(), 
toBytes(previous.getValue()));
+        VersionedValue<byte[]> previousCopy = 
previous.mapValue(SharedCount::toBytes);
         return sharedValue.trySetValue(previousCopy, toBytes(newCount));
     }
 
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 50e50edd..ddb96fcc 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
  * value (considering ZK's normal consistency guarantees).
  */
 public class SharedValue implements Closeable, SharedValueReader {
+    private static final int NO_ZXID = -1;
     private static final int UNINITIALIZED_VERSION = -1;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -101,8 +102,8 @@ public class SharedValue implements Closeable, 
SharedValueReader {
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         this.watcher = new SharedValueCuratorWatcher();
-        currentValue = new AtomicReference<VersionedValue<byte[]>>(
-                new VersionedValue<byte[]>(UNINITIALIZED_VERSION, 
Arrays.copyOf(seedValue, seedValue.length)));
+        currentValue = new AtomicReference<>(
+                new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, 
Arrays.copyOf(seedValue, seedValue.length)));
     }
 
     @VisibleForTesting
@@ -112,8 +113,8 @@ public class SharedValue implements Closeable, 
SharedValueReader {
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         // inject watcher for testing
         this.watcher = watcher;
-        currentValue = new AtomicReference<VersionedValue<byte[]>>(
-                new VersionedValue<byte[]>(UNINITIALIZED_VERSION, 
Arrays.copyOf(seedValue, seedValue.length)));
+        currentValue = new AtomicReference<>(
+                new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, 
Arrays.copyOf(seedValue, seedValue.length)));
     }
 
     @Override
@@ -125,12 +126,11 @@ public class SharedValue implements Closeable, 
SharedValueReader {
     @Override
     public VersionedValue<byte[]> getVersionedValue() {
         VersionedValue<byte[]> localCopy = currentValue.get();
-        return new VersionedValue<byte[]>(
-                localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(), 
localCopy.getValue().length));
+        return localCopy.mapValue(bytes -> Arrays.copyOf(bytes, bytes.length));
     }
 
     /**
-     * Change the shared value value irrespective of its previous state
+     * Change the shared value irrespective of its previous state
      *
      * @param newValue new value
      * @throws Exception ZK errors, interruptions, etc.
@@ -139,7 +139,7 @@ public class SharedValue implements Closeable, 
SharedValueReader {
         Preconditions.checkState(state.get() == State.STARTED, "not started");
 
         Stat result = client.setData().forPath(path, newValue);
-        updateValue(result.getVersion(), Arrays.copyOf(newValue, 
newValue.length));
+        updateValue(result.getMzxid(), result.getVersion(), 
Arrays.copyOf(newValue, newValue.length));
     }
 
     /**
@@ -171,19 +171,25 @@ public class SharedValue implements Closeable, 
SharedValueReader {
      * @param newValue the new value to attempt
      * @return true if the change attempt was successful, false if not. If the 
change
      * was not successful, {@link #getValue()} will return the updated value
+     * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} 
overflowed to {@code -1}
      * @throws Exception ZK errors, interruptions, etc.
      */
     public boolean trySetValue(VersionedValue<byte[]> previous, byte[] 
newValue) throws Exception {
         Preconditions.checkState(state.get() == State.STARTED, "not started");
 
         VersionedValue<byte[]> current = currentValue.get();
-        if (previous.getVersion() != current.getVersion() || 
!Arrays.equals(previous.getValue(), current.getValue())) {
+        // Omit comparing of getVersion here, so we can test the exception 
case.
+        // This affects no correctness as construction of VersionedValue is 
private.
+        if (previous.getZxid() != current.getZxid() || 
!Arrays.equals(previous.getValue(), current.getValue())) {
             return false;
         }
+        if (previous.getVersion() == -1) {
+            throw new IllegalTrySetVersionException();
+        }
 
         try {
             Stat result = 
client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
-            updateValue(result.getVersion(), Arrays.copyOf(newValue, 
newValue.length));
+            updateValue(result.getMzxid(), result.getVersion(), 
Arrays.copyOf(newValue, newValue.length));
             return true;
         } catch (KeeperException.BadVersionException ignore) {
             // ignore
@@ -193,14 +199,13 @@ public class SharedValue implements Closeable, 
SharedValueReader {
         return false;
     }
 
-    private void updateValue(int version, byte[] bytes) {
+    private void updateValue(long zxid, int version, byte[] bytes) {
         while (true) {
             VersionedValue<byte[]> current = currentValue.get();
-            if (current.getVersion() >= version) {
-                // A newer version was concurrently set.
+            if (current.getZxid() >= zxid) {
                 return;
             }
-            if (currentValue.compareAndSet(current, new 
VersionedValue<byte[]>(version, bytes))) {
+            if (currentValue.compareAndSet(current, new VersionedValue<>(zxid, 
version, bytes))) {
                 // Successfully set.
                 return;
             }
@@ -248,14 +253,14 @@ public class SharedValue implements Closeable, 
SharedValueReader {
         Stat localStat = new Stat();
         byte[] bytes =
                 
client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
-        updateValue(localStat.getVersion(), bytes);
+        updateValue(localStat.getMzxid(), localStat.getVersion(), bytes);
     }
 
     private final BackgroundCallback upadateAndNotifyListenerCallback = new 
BackgroundCallback() {
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) 
throws Exception {
             if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
-                updateValue(event.getStat().getVersion(), event.getData());
+                updateValue(event.getStat().getMzxid(), 
event.getStat().getVersion(), event.getData());
                 notifyListeners();
             }
         }
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
index 64d9780b..64f09765 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
@@ -20,23 +20,38 @@
 package org.apache.curator.framework.recipes.shared;
 
 import com.google.common.base.Preconditions;
+import java.util.function.Function;
+import org.apache.zookeeper.data.Stat;
 
 /**
- * POJO for a version and a value
+ * POJO for versioned value.
+ *
+ * <p>Client must never construct this but get through {@link 
SharedValue#getVersionedValue()}
+ * or {@link SharedCount#getVersionedValue()}.
  */
 public class VersionedValue<T> {
+    private final long zxid;
     private final int version;
     private final T value;
 
-    /**
-     * @param version the version
-     * @param value the value (cannot be null)
-     */
-    VersionedValue(int version, T value) {
+    VersionedValue(long zxid, int version, T value) {
+        this.zxid = zxid;
         this.version = version;
         this.value = Preconditions.checkNotNull(value, "value cannot be null");
     }
 
+    /**
+     * It is {@link Stat#getMzxid()} of the corresponding node.
+     */
+    public long getZxid() {
+        return zxid;
+    }
+
+    /**
+     * It is {@link Stat#getVersion()} of the corresponding node.
+     *
+     * <p>It is known that this will overflow and hence not monotonic.
+     */
     public int getVersion() {
         return version;
     }
@@ -44,4 +59,8 @@ public class VersionedValue<T> {
     public T getValue() {
         return value;
     }
+
+    <R> VersionedValue<R> mapValue(Function<T, R> f) {
+        return new VersionedValue<>(zxid, version, f.apply(value));
+    }
 }
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index b7b56c06..6e843883 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.shared;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -51,6 +52,7 @@ import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.WatchedEvent;
 import org.junit.jupiter.api.Test;
 
+@SuppressWarnings("deprecation")
 public class TestSharedCount extends CuratorTestBase {
     @Test
     public void testMultiClients() throws Exception {
@@ -206,13 +208,20 @@ public class TestSharedCount extends CuratorTestBase {
             assertEquals(count.getCount(), 10);
 
             // Wrong value
-            assertFalse(count.trySetCount(new VersionedValue<Integer>(3, 20), 
7));
+            assertFalse(count.trySetCount(new 
VersionedValue<>(current.getZxid(), 3, 20), 7));
             // Wrong version
-            assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 10), 
7));
+            assertFalse(count.trySetCount(new 
VersionedValue<>(current.getZxid(), 10, 10), 7));
+            assertFalse(count.trySetCount(new 
VersionedValue<>(current.getZxid() + 1, 3, 10), 7));
 
             // Server changed
             client.setData().forPath("/count", SharedCount.toBytes(88));
             assertFalse(count.trySetCount(current, 234));
+
+            assertThrows(IllegalTrySetVersionException.class, () -> {
+                VersionedValue<Integer> cached = count.getVersionedValue();
+                VersionedValue<Integer> illegal = new 
VersionedValue<>(cached.getZxid(), -1, cached.getValue());
+                count.trySetCount(illegal, 20);
+            });
         } finally {
             CloseableUtils.closeQuietly(count);
             CloseableUtils.closeQuietly(client);

Reply via email to