This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fcaea8db RATIS-1933. Two improvements on ReferenceCountedObject. 
(#963)
0fcaea8db is described below

commit 0fcaea8db16bba45e555632cdb7f975ad883f333
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Nov 10 08:43:55 2023 -0800

    RATIS-1933. Two improvements on ReferenceCountedObject. (#963)
---
 .../apache/ratis/util/ReferenceCountedObject.java  | 63 ++++++++++++++++++----
 .../function/UncheckedAutoCloseableSupplier.java   | 30 +++++++++++
 .../ratis/netty/server/DataStreamManagement.java   |  9 ++--
 .../ratis/util/TestReferenceCountedObject.java     | 43 ++++++++++++---
 4 files changed, 121 insertions(+), 24 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index fec82f099..0dd378dc0 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -17,23 +17,27 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 /**
  * A reference-counted object can be retained for later use
  * and then be released for returning the resource.
- *
+ * <p>
  * - When the object is retained, the reference count is incremented by 1.
- *
+ * <p>
  * - When the object is released, the reference count is decremented by 1.
- *
+ * <p>
  * - If the object is retained, it must be released afterward.
  *   Otherwise, the object will not be returned, and it will cause a resource 
leak.
- *
+ * <p>
  * - If the object is retained multiple times,
  *   it must be released the same number of times.
- *
+ * <p>
  * - If the object has been retained and then completely released (i.e. the 
reference count becomes 0),
  *   it must not be retained/released/accessed anymore since it may have been 
allocated for other use.
  *
@@ -46,7 +50,7 @@ public interface ReferenceCountedObject<T> {
   /**
    * Retain the object for later use.
    * The reference count will be increased by 1.
-   *
+   * <p>
    * The {@link #release()} method must be invoked afterward.
    * Otherwise, the object is not returned, and it will cause a resource leak.
    *
@@ -54,6 +58,36 @@ public interface ReferenceCountedObject<T> {
    */
   T retain();
 
+  /**
+   * The same as {@link #retain()} except that this method returns a {@link 
UncheckedAutoCloseableSupplier}.
+   *
+   * @return a {@link UncheckedAutoCloseableSupplier}
+   *         where {@link java.util.function.Supplier#get()} will return the 
retained object,
+   *         i.e. the object returned by {@link #retain()},
+   *         and calling {@link UncheckedAutoCloseable#close()} one or more 
times
+   *         is the same as calling {@link #release()} once (idempotent).
+   */
+  default UncheckedAutoCloseableSupplier<T> retainAndReleaseOnClose() {
+    final T retained = retain();
+    final AtomicBoolean closed = new AtomicBoolean();
+    return new UncheckedAutoCloseableSupplier<T>() {
+      @Override
+      public T get() {
+        if (closed.get()) {
+          throw new IllegalStateException("Already closed");
+        }
+        return retained;
+      }
+
+      @Override
+      public void close() {
+        if (closed.compareAndSet(false, true)) {
+          release();
+        }
+      }
+    };
+  }
+
   /**
    * Release the object.
    * The reference count will be decreased by 1.
@@ -64,7 +98,7 @@ public interface ReferenceCountedObject<T> {
 
   /** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
   static <V> ReferenceCountedObject<V> wrap(V value) {
-    return wrap(value, () -> {}, () -> {});
+    return wrap(value, () -> {}, ignored -> {});
   }
 
   /**
@@ -72,11 +106,12 @@ public interface ReferenceCountedObject<T> {
    *
    * @param value the value being wrapped.
    * @param retainMethod a method to run when {@link #retain()} is invoked.
-   * @param releaseMethod a method to run when {@link #release()} is invoked.
+   * @param releaseMethod a method to run when {@link #release()} is invoked,
+   *                      where the method takes a boolean which is the same 
as the one returned by {@link #release()}.
    * @param <V> The value type.
    * @return the wrapped reference-counted object.
    */
-  static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, 
Runnable releaseMethod) {
+  static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, 
Consumer<Boolean> releaseMethod) {
     Objects.requireNonNull(value, "value == null");
     Objects.requireNonNull(retainMethod, "retainMethod == null");
     Objects.requireNonNull(releaseMethod, "releaseMethod == null");
@@ -118,9 +153,15 @@ public interface ReferenceCountedObject<T> {
         } else if (previous == 0) {
           throw new IllegalStateException("Failed to release: object has not 
yet been retained.");
         }
-        releaseMethod.run();
-        return previous == 1;
+        final boolean completedReleased = previous == 1;
+        releaseMethod.accept(completedReleased);
+        return completedReleased;
       }
     };
   }
+
+  /** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
+  static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, 
Runnable releaseMethod) {
+    return wrap(value, retainMethod, ignored -> releaseMethod.run());
+  }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/function/UncheckedAutoCloseableSupplier.java
 
b/ratis-common/src/main/java/org/apache/ratis/util/function/UncheckedAutoCloseableSupplier.java
new file mode 100644
index 000000000..de4c1eff4
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/function/UncheckedAutoCloseableSupplier.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import org.apache.ratis.util.UncheckedAutoCloseable;
+
+import java.util.function.Supplier;
+
+/**
+ * A {@link Supplier} which is also {@link UncheckedAutoCloseable}.
+ *
+ * @param <T> the type of the {@link Supplier}.
+ */
+public interface UncheckedAutoCloseableSupplier<T> extends 
UncheckedAutoCloseable, Supplier<T> {
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index dd9a49929..695cc9645 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -59,6 +59,7 @@ import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.apache.ratis.util.function.CheckedBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -327,14 +328,12 @@ public class DataStreamManagement {
     final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
-      final ReferenceCountedObject<ByteBuffer> wrapped = 
ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
-      wrapped.retain();
-      try {
+      final ReferenceCountedObject<ByteBuffer> wrapped = 
ReferenceCountedObject.wrap(
+          buffer, buf::retain, ignored -> buf.release());
+      try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
         byteWritten += channel.write(wrapped);
       } catch (Throwable t) {
         throw new CompletionException(t);
-      } finally {
-        wrapped.release();
       }
     }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
 
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
index 5a855857a..fe58b92da 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,17 +59,31 @@ public class TestReferenceCountedObject {
     Assert.assertEquals(value, ref.retain());
     assertValues(retained, 1, released, 0);
 
-    Assert.assertEquals(value, ref.retain());
-    assertValues(retained, 2, released, 0);
-
-    assertRelease(ref, retained, 2, released, 1);
+    try(UncheckedAutoCloseableSupplier<String> auto = 
ref.retainAndReleaseOnClose()) {
+      final String got = auto.get();
+      Assert.assertEquals(value, got);
+      Assert.assertSame(got, auto.get()); // it should return the same object.
+      assertValues(retained, 2, released, 0);
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+    assertValues(retained, 2, released, 1);
 
-    Assert.assertEquals(value, ref.retain());
+    final UncheckedAutoCloseableSupplier<String> notClosing = 
ref.retainAndReleaseOnClose();
+    Assert.assertEquals(value, notClosing.get());
     assertValues(retained, 3, released, 1);
-
     assertRelease(ref, retained, 3, released, 2);
 
-    assertRelease(ref, retained, 3, released, 3);
+    final UncheckedAutoCloseableSupplier<String> auto = 
ref.retainAndReleaseOnClose();
+    Assert.assertEquals(value, auto.get());
+    assertValues(retained, 4, released, 2);
+    auto.close();
+    assertValues(retained, 4, released, 3);
+    auto.close();  // close() is idempotent.
+    assertValues(retained, 4, released, 3);
+
+    // completely released
+    assertRelease(ref, retained, 4, released, 4);
 
     try {
       ref.get();
@@ -84,6 +99,12 @@ public class TestReferenceCountedObject {
       e.printStackTrace(System.out);
     }
 
+    try(UncheckedAutoCloseable ignore = ref.retainAndReleaseOnClose()) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+
     try {
       ref.release();
       Assert.fail();
@@ -94,7 +115,7 @@ public class TestReferenceCountedObject {
 
   @Test(timeout = 1000)
   public void testReleaseWithoutRetaining() {
-    final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("", 
() -> {}, () -> {});
+    final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("");
 
     try {
       ref.release();
@@ -116,5 +137,11 @@ public class TestReferenceCountedObject {
     } catch (IllegalStateException e) {
       e.printStackTrace(System.out);
     }
+
+    try(UncheckedAutoCloseable ignore = ref.retainAndReleaseOnClose()) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
   }
 }

Reply via email to