This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes in repository https://gitbox.apache.org/repos/asf/curator.git
commit 18543788669af270b51af291aac6185068896ad5 Author: tison <[email protected]> AuthorDate: Fri Sep 15 02:05:22 2023 +0800 revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution Signed-off-by: tison <[email protected]> --- .../framework/recipes/cache/CuratorCacheImpl.java | 35 +++------------- .../framework/recipes/cache/OutstandingOps.java | 49 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index 50760dd2..23268f5e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -30,7 +30,6 @@ import com.google.common.collect.Sets; import java.util.Collections; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; @@ -61,28 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard(); private final Consumer<Exception> exceptionHandler; - private final Phaser rootPhaser = new Phaser() { - @Override - protected boolean onAdvance(int phase, int registeredParties) { - callListeners(CuratorCacheListener::initialized); - synchronized (CuratorCacheImpl.this) { - currentChildPhaser = rootPhaser; - } - return true; - } - }; - - private Phaser currentChildPhaser = new Phaser(rootPhaser); - - private synchronized Phaser getPhaserAndRegister() { - if (currentChildPhaser.getRegisteredParties() >= 0xffff) { - currentChildPhaser = new Phaser(rootPhaser); - } - - currentChildPhaser.register(); - - return currentChildPhaser; - } + private final OutstandingOps outstandingOps = + new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized)); private enum State { LATENT, @@ -199,8 +178,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { return; // children haven't changed } - final Phaser outstandingOps = getPhaserAndRegister(); - try { BackgroundCallback callback = (__, event) -> { if (event.getResultCode() == OK.intValue()) { @@ -210,9 +187,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { } else { handleException(event); } - outstandingOps.arriveAndDeregister(); + outstandingOps.decrement(); }; + outstandingOps.increment(); client.getChildren().inBackground(callback).forPath(fromPath); } catch (Exception e) { handleException(e); @@ -224,8 +202,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { return; } - final Phaser outstandingOps = getPhaserAndRegister(); - try { BackgroundCallback callback = (__, event) -> { if (event.getResultCode() == OK.intValue()) { @@ -238,9 +214,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge { } else { handleException(event); } - outstandingOps.arriveAndDeregister(); + outstandingOps.decrement(); }; + outstandingOps.increment(); if (compressedData) { client.getData().decompressed().inBackground(callback).forPath(fromPath); } else { diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java new file mode 100644 index 00000000..81a1ac26 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +class OutstandingOps { + private final AtomicReference<Runnable> completionProc; + private final AtomicLong count = new AtomicLong(0); + private volatile boolean active = true; + + OutstandingOps(Runnable completionProc) { + this.completionProc = new AtomicReference<>(completionProc); + } + + void increment() { + if (active) { + count.incrementAndGet(); + } + } + + void decrement() { + if (active && (count.decrementAndGet() == 0)) { + Runnable proc = completionProc.getAndSet(null); + if (proc != null) { + active = false; + proc.run(); + } + } + } +}
