This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0fcaea8db RATIS-1933. Two improvements on ReferenceCountedObject.
(#963)
0fcaea8db is described below
commit 0fcaea8db16bba45e555632cdb7f975ad883f333
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Nov 10 08:43:55 2023 -0800
RATIS-1933. Two improvements on ReferenceCountedObject. (#963)
---
.../apache/ratis/util/ReferenceCountedObject.java | 63 ++++++++++++++++++----
.../function/UncheckedAutoCloseableSupplier.java | 30 +++++++++++
.../ratis/netty/server/DataStreamManagement.java | 9 ++--
.../ratis/util/TestReferenceCountedObject.java | 43 ++++++++++++---
4 files changed, 121 insertions(+), 24 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index fec82f099..0dd378dc0 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -17,23 +17,27 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
/**
* A reference-counted object can be retained for later use
* and then be released for returning the resource.
- *
+ * <p>
* - When the object is retained, the reference count is incremented by 1.
- *
+ * <p>
* - When the object is released, the reference count is decremented by 1.
- *
+ * <p>
* - If the object is retained, it must be released afterward.
* Otherwise, the object will not be returned, and it will cause a resource
leak.
- *
+ * <p>
* - If the object is retained multiple times,
* it must be released the same number of times.
- *
+ * <p>
* - If the object has been retained and then completely released (i.e. the
reference count becomes 0),
* it must not be retained/released/accessed anymore since it may have been
allocated for other use.
*
@@ -46,7 +50,7 @@ public interface ReferenceCountedObject<T> {
/**
* Retain the object for later use.
* The reference count will be increased by 1.
- *
+ * <p>
* The {@link #release()} method must be invoked afterward.
* Otherwise, the object is not returned, and it will cause a resource leak.
*
@@ -54,6 +58,36 @@ public interface ReferenceCountedObject<T> {
*/
T retain();
+ /**
+ * The same as {@link #retain()} except that this method returns a {@link
UncheckedAutoCloseableSupplier}.
+ *
+ * @return a {@link UncheckedAutoCloseableSupplier}
+ * where {@link java.util.function.Supplier#get()} will return the
retained object,
+ * i.e. the object returned by {@link #retain()},
+ * and calling {@link UncheckedAutoCloseable#close()} one or more
times
+ * is the same as calling {@link #release()} once (idempotent).
+ */
+ default UncheckedAutoCloseableSupplier<T> retainAndReleaseOnClose() {
+ final T retained = retain();
+ final AtomicBoolean closed = new AtomicBoolean();
+ return new UncheckedAutoCloseableSupplier<T>() {
+ @Override
+ public T get() {
+ if (closed.get()) {
+ throw new IllegalStateException("Already closed");
+ }
+ return retained;
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ release();
+ }
+ }
+ };
+ }
+
/**
* Release the object.
* The reference count will be decreased by 1.
@@ -64,7 +98,7 @@ public interface ReferenceCountedObject<T> {
/** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
static <V> ReferenceCountedObject<V> wrap(V value) {
- return wrap(value, () -> {}, () -> {});
+ return wrap(value, () -> {}, ignored -> {});
}
/**
@@ -72,11 +106,12 @@ public interface ReferenceCountedObject<T> {
*
* @param value the value being wrapped.
* @param retainMethod a method to run when {@link #retain()} is invoked.
- * @param releaseMethod a method to run when {@link #release()} is invoked.
+ * @param releaseMethod a method to run when {@link #release()} is invoked,
+ * where the method takes a boolean which is the same
as the one returned by {@link #release()}.
* @param <V> The value type.
* @return the wrapped reference-counted object.
*/
- static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod,
Runnable releaseMethod) {
+ static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod,
Consumer<Boolean> releaseMethod) {
Objects.requireNonNull(value, "value == null");
Objects.requireNonNull(retainMethod, "retainMethod == null");
Objects.requireNonNull(releaseMethod, "releaseMethod == null");
@@ -118,9 +153,15 @@ public interface ReferenceCountedObject<T> {
} else if (previous == 0) {
throw new IllegalStateException("Failed to release: object has not
yet been retained.");
}
- releaseMethod.run();
- return previous == 1;
+ final boolean completedReleased = previous == 1;
+ releaseMethod.accept(completedReleased);
+ return completedReleased;
}
};
}
+
+ /** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
+ static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod,
Runnable releaseMethod) {
+ return wrap(value, retainMethod, ignored -> releaseMethod.run());
+ }
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/function/UncheckedAutoCloseableSupplier.java
b/ratis-common/src/main/java/org/apache/ratis/util/function/UncheckedAutoCloseableSupplier.java
new file mode 100644
index 000000000..de4c1eff4
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/util/function/UncheckedAutoCloseableSupplier.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import org.apache.ratis.util.UncheckedAutoCloseable;
+
+import java.util.function.Supplier;
+
+/**
+ * A {@link Supplier} which is also {@link UncheckedAutoCloseable}.
+ *
+ * @param <T> the type of the {@link Supplier}.
+ */
+public interface UncheckedAutoCloseableSupplier<T> extends
UncheckedAutoCloseable, Supplier<T> {
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index dd9a49929..695cc9645 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -59,6 +59,7 @@ import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -327,14 +328,12 @@ public class DataStreamManagement {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
- final ReferenceCountedObject<ByteBuffer> wrapped =
ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
- wrapped.retain();
- try {
+ final ReferenceCountedObject<ByteBuffer> wrapped =
ReferenceCountedObject.wrap(
+ buffer, buf::retain, ignored -> buf.release());
+ try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
byteWritten += channel.write(wrapped);
} catch (Throwable t) {
throw new CompletionException(t);
- } finally {
- wrapped.release();
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
index 5a855857a..fe58b92da 100644
---
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
+++
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.Assert;
import org.junit.Test;
@@ -58,17 +59,31 @@ public class TestReferenceCountedObject {
Assert.assertEquals(value, ref.retain());
assertValues(retained, 1, released, 0);
- Assert.assertEquals(value, ref.retain());
- assertValues(retained, 2, released, 0);
-
- assertRelease(ref, retained, 2, released, 1);
+ try(UncheckedAutoCloseableSupplier<String> auto =
ref.retainAndReleaseOnClose()) {
+ final String got = auto.get();
+ Assert.assertEquals(value, got);
+ Assert.assertSame(got, auto.get()); // it should return the same object.
+ assertValues(retained, 2, released, 0);
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+ assertValues(retained, 2, released, 1);
- Assert.assertEquals(value, ref.retain());
+ final UncheckedAutoCloseableSupplier<String> notClosing =
ref.retainAndReleaseOnClose();
+ Assert.assertEquals(value, notClosing.get());
assertValues(retained, 3, released, 1);
-
assertRelease(ref, retained, 3, released, 2);
- assertRelease(ref, retained, 3, released, 3);
+ final UncheckedAutoCloseableSupplier<String> auto =
ref.retainAndReleaseOnClose();
+ Assert.assertEquals(value, auto.get());
+ assertValues(retained, 4, released, 2);
+ auto.close();
+ assertValues(retained, 4, released, 3);
+ auto.close(); // close() is idempotent.
+ assertValues(retained, 4, released, 3);
+
+ // completely released
+ assertRelease(ref, retained, 4, released, 4);
try {
ref.get();
@@ -84,6 +99,12 @@ public class TestReferenceCountedObject {
e.printStackTrace(System.out);
}
+ try(UncheckedAutoCloseable ignore = ref.retainAndReleaseOnClose()) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+
try {
ref.release();
Assert.fail();
@@ -94,7 +115,7 @@ public class TestReferenceCountedObject {
@Test(timeout = 1000)
public void testReleaseWithoutRetaining() {
- final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("",
() -> {}, () -> {});
+ final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("");
try {
ref.release();
@@ -116,5 +137,11 @@ public class TestReferenceCountedObject {
} catch (IllegalStateException e) {
e.printStackTrace(System.out);
}
+
+ try(UncheckedAutoCloseable ignore = ref.retainAndReleaseOnClose()) {
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
}
}