This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new a9a80209 CURATOR-688. Fix SharedCount/SharedValue not update after
Stat.version overflowed (#478)
a9a80209 is described below
commit a9a8020918c345fb6c3c0a099a27189cbd7cf819
Author: Hexiaoqiao <[email protected]>
AuthorDate: Tue Jun 4 10:23:19 2024 +0800
CURATOR-688. Fix SharedCount/SharedValue not update after Stat.version
overflowed (#478)
Co-authored-by: Kezhu Wang <[email protected]>
Co-authored-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
---
...lue.java => IllegalTrySetVersionException.java} | 34 ++++++++------------
.../framework/recipes/shared/SharedCount.java | 7 ++--
.../framework/recipes/shared/SharedValue.java | 37 ++++++++++++----------
.../framework/recipes/shared/VersionedValue.java | 31 ++++++++++++++----
.../framework/recipes/shared/TestSharedCount.java | 13 ++++++--
5 files changed, 74 insertions(+), 48 deletions(-)
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
similarity index 53%
copy from
curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
copy to
curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
index 64d9780b..58e83399 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
@@ -19,29 +19,21 @@
package org.apache.curator.framework.recipes.shared;
-import com.google.common.base.Preconditions;
+import org.apache.zookeeper.data.Stat;
/**
- * POJO for a version and a value
+ * Exception to alert overflowed {@link Stat#getVersion()} {@code -1} which is
not suitable in
+ * {@link SharedValue#trySetValue(VersionedValue, byte[])} and {@link
SharedCount#trySetCount(VersionedValue, int)}.
+ *
+ * <p>In case of this exception, clients have to choose:
+ * <ul>
+ * <li>Take their own risk to do a blind set.</li>
+ * <li>Update ZooKeeper cluster to solve <a
href="https://issues.apache.org/jira/browse/ZOOKEEPER-4743">ZOOKEEPER-4743</a>.</li>
+ * </ul>
*/
-public class VersionedValue<T> {
- private final int version;
- private final T value;
-
- /**
- * @param version the version
- * @param value the value (cannot be null)
- */
- VersionedValue(int version, T value) {
- this.version = version;
- this.value = Preconditions.checkNotNull(value, "value cannot be null");
- }
-
- public int getVersion() {
- return version;
- }
-
- public T getValue() {
- return value;
+public class IllegalTrySetVersionException extends IllegalArgumentException {
+ @Override
+ public String getMessage() {
+ return "overflowed Stat.version -1 is not suitable for trySet(a.k.a.
compare-and-set ZooKeeper::setData)";
}
}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index c401a3b0..2ecc528e 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionState;
+import org.apache.zookeeper.data.Stat;
/**
* Manages a shared integer. All clients watching the same path will have the
up-to-date
@@ -60,7 +61,7 @@ public class SharedCount implements Closeable,
SharedCountReader, Listenable<Sha
@Override
public VersionedValue<Integer> getVersionedValue() {
VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
- return new VersionedValue<Integer>(localValue.getVersion(),
fromBytes(localValue.getValue()));
+ return localValue.mapValue(SharedCount::fromBytes);
}
/**
@@ -102,11 +103,11 @@ public class SharedCount implements Closeable,
SharedCountReader, Listenable<Sha
* @param newCount the new value to attempt
* @return true if the change attempt was successful, false if not. If the
change
* was not successful, {@link #getCount()} will return the updated value
+ * @throws IllegalTrySetVersionException if {@link Stat#getVersion()}
overflowed to {@code -1}
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount)
throws Exception {
- VersionedValue<byte[]> previousCopy =
- new VersionedValue<byte[]>(previous.getVersion(),
toBytes(previous.getValue()));
+ VersionedValue<byte[]> previousCopy =
previous.mapValue(SharedCount::toBytes);
return sharedValue.trySetValue(previousCopy, toBytes(newCount));
}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 50e50edd..ddb96fcc 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
* value (considering ZK's normal consistency guarantees).
*/
public class SharedValue implements Closeable, SharedValueReader {
+ private static final int NO_ZXID = -1;
private static final int UNINITIALIZED_VERSION = -1;
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -101,8 +102,8 @@ public class SharedValue implements Closeable,
SharedValueReader {
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
this.watcher = new SharedValueCuratorWatcher();
- currentValue = new AtomicReference<VersionedValue<byte[]>>(
- new VersionedValue<byte[]>(UNINITIALIZED_VERSION,
Arrays.copyOf(seedValue, seedValue.length)));
+ currentValue = new AtomicReference<>(
+ new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION,
Arrays.copyOf(seedValue, seedValue.length)));
}
@VisibleForTesting
@@ -112,8 +113,8 @@ public class SharedValue implements Closeable,
SharedValueReader {
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
// inject watcher for testing
this.watcher = watcher;
- currentValue = new AtomicReference<VersionedValue<byte[]>>(
- new VersionedValue<byte[]>(UNINITIALIZED_VERSION,
Arrays.copyOf(seedValue, seedValue.length)));
+ currentValue = new AtomicReference<>(
+ new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION,
Arrays.copyOf(seedValue, seedValue.length)));
}
@Override
@@ -125,12 +126,11 @@ public class SharedValue implements Closeable,
SharedValueReader {
@Override
public VersionedValue<byte[]> getVersionedValue() {
VersionedValue<byte[]> localCopy = currentValue.get();
- return new VersionedValue<byte[]>(
- localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(),
localCopy.getValue().length));
+ return localCopy.mapValue(bytes -> Arrays.copyOf(bytes, bytes.length));
}
/**
- * Change the shared value value irrespective of its previous state
+ * Change the shared value irrespective of its previous state
*
* @param newValue new value
* @throws Exception ZK errors, interruptions, etc.
@@ -139,7 +139,7 @@ public class SharedValue implements Closeable,
SharedValueReader {
Preconditions.checkState(state.get() == State.STARTED, "not started");
Stat result = client.setData().forPath(path, newValue);
- updateValue(result.getVersion(), Arrays.copyOf(newValue,
newValue.length));
+ updateValue(result.getMzxid(), result.getVersion(),
Arrays.copyOf(newValue, newValue.length));
}
/**
@@ -171,19 +171,25 @@ public class SharedValue implements Closeable,
SharedValueReader {
* @param newValue the new value to attempt
* @return true if the change attempt was successful, false if not. If the
change
* was not successful, {@link #getValue()} will return the updated value
+ * @throws IllegalTrySetVersionException if {@link Stat#getVersion()}
overflowed to {@code -1}
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetValue(VersionedValue<byte[]> previous, byte[]
newValue) throws Exception {
Preconditions.checkState(state.get() == State.STARTED, "not started");
VersionedValue<byte[]> current = currentValue.get();
- if (previous.getVersion() != current.getVersion() ||
!Arrays.equals(previous.getValue(), current.getValue())) {
+ // Omit comparing of getVersion here, so we can test the exception
case.
+ // This affects no correctness as construction of VersionedValue is
private.
+ if (previous.getZxid() != current.getZxid() ||
!Arrays.equals(previous.getValue(), current.getValue())) {
return false;
}
+ if (previous.getVersion() == -1) {
+ throw new IllegalTrySetVersionException();
+ }
try {
Stat result =
client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
- updateValue(result.getVersion(), Arrays.copyOf(newValue,
newValue.length));
+ updateValue(result.getMzxid(), result.getVersion(),
Arrays.copyOf(newValue, newValue.length));
return true;
} catch (KeeperException.BadVersionException ignore) {
// ignore
@@ -193,14 +199,13 @@ public class SharedValue implements Closeable,
SharedValueReader {
return false;
}
- private void updateValue(int version, byte[] bytes) {
+ private void updateValue(long zxid, int version, byte[] bytes) {
while (true) {
VersionedValue<byte[]> current = currentValue.get();
- if (current.getVersion() >= version) {
- // A newer version was concurrently set.
+ if (current.getZxid() >= zxid) {
return;
}
- if (currentValue.compareAndSet(current, new
VersionedValue<byte[]>(version, bytes))) {
+ if (currentValue.compareAndSet(current, new VersionedValue<>(zxid,
version, bytes))) {
// Successfully set.
return;
}
@@ -248,14 +253,14 @@ public class SharedValue implements Closeable,
SharedValueReader {
Stat localStat = new Stat();
byte[] bytes =
client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
- updateValue(localStat.getVersion(), bytes);
+ updateValue(localStat.getMzxid(), localStat.getVersion(), bytes);
}
private final BackgroundCallback upadateAndNotifyListenerCallback = new
BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event)
throws Exception {
if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
- updateValue(event.getStat().getVersion(), event.getData());
+ updateValue(event.getStat().getMzxid(),
event.getStat().getVersion(), event.getData());
notifyListeners();
}
}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
index 64d9780b..64f09765 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
@@ -20,23 +20,38 @@
package org.apache.curator.framework.recipes.shared;
import com.google.common.base.Preconditions;
+import java.util.function.Function;
+import org.apache.zookeeper.data.Stat;
/**
- * POJO for a version and a value
+ * POJO for versioned value.
+ *
+ * <p>Client must never construct this but get through {@link
SharedValue#getVersionedValue()}
+ * or {@link SharedCount#getVersionedValue()}.
*/
public class VersionedValue<T> {
+ private final long zxid;
private final int version;
private final T value;
- /**
- * @param version the version
- * @param value the value (cannot be null)
- */
- VersionedValue(int version, T value) {
+ VersionedValue(long zxid, int version, T value) {
+ this.zxid = zxid;
this.version = version;
this.value = Preconditions.checkNotNull(value, "value cannot be null");
}
+ /**
+ * It is {@link Stat#getMzxid()} of the corresponding node.
+ */
+ public long getZxid() {
+ return zxid;
+ }
+
+ /**
+ * It is {@link Stat#getVersion()} of the corresponding node.
+ *
+ * <p>It is known that this will overflow and hence not monotonic.
+ */
public int getVersion() {
return version;
}
@@ -44,4 +59,8 @@ public class VersionedValue<T> {
public T getValue() {
return value;
}
+
+ <R> VersionedValue<R> mapValue(Function<T, R> f) {
+ return new VersionedValue<>(zxid, version, f.apply(value));
+ }
}
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index b7b56c06..6e843883 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.shared;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -51,6 +52,7 @@ import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.junit.jupiter.api.Test;
+@SuppressWarnings("deprecation")
public class TestSharedCount extends CuratorTestBase {
@Test
public void testMultiClients() throws Exception {
@@ -206,13 +208,20 @@ public class TestSharedCount extends CuratorTestBase {
assertEquals(count.getCount(), 10);
// Wrong value
- assertFalse(count.trySetCount(new VersionedValue<Integer>(3, 20),
7));
+ assertFalse(count.trySetCount(new
VersionedValue<>(current.getZxid(), 3, 20), 7));
// Wrong version
- assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 10),
7));
+ assertFalse(count.trySetCount(new
VersionedValue<>(current.getZxid(), 10, 10), 7));
+ assertFalse(count.trySetCount(new
VersionedValue<>(current.getZxid() + 1, 3, 10), 7));
// Server changed
client.setData().forPath("/count", SharedCount.toBytes(88));
assertFalse(count.trySetCount(current, 234));
+
+ assertThrows(IllegalTrySetVersionException.class, () -> {
+ VersionedValue<Integer> cached = count.getVersionedValue();
+ VersionedValue<Integer> illegal = new
VersionedValue<>(cached.getZxid(), -1, cached.getValue());
+ count.trySetCount(illegal, 20);
+ });
} finally {
CloseableUtils.closeQuietly(count);
CloseableUtils.closeQuietly(client);