This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6742a4e  RATIS-1556. Support reference-counted buffer in 
StateMachine.DataChannel (#626)
6742a4e is described below

commit 6742a4eff7430d41807dcdecb7d0254142ccb974
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Mar 23 15:45:45 2022 +0800

    RATIS-1556. Support reference-counted buffer in StateMachine.DataChannel 
(#626)
---
 .../apache/ratis/util/ReferenceCountedObject.java  | 118 +++++++++++++++++++++
 .../ratis/netty/server/DataStreamManagement.java   |   8 +-
 .../apache/ratis/statemachine/StateMachine.java    |  36 +++++++
 .../ratis/util/TestReferenceCountedObject.java     | 115 ++++++++++++++++++++
 4 files changed, 273 insertions(+), 4 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
new file mode 100644
index 0000000..8b0c859
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A reference-counted object can be retained for later use
+ * and then be released for returning the resource.
+ *
+ * - When the object is retained, the reference count is incremented by 1.
+ *
+ * - When the object is released, the reference count is decremented by 1.
+ *
+ * - If the object is retained, it must be released afterward.
+ *   Otherwise, the object will not be returned, and it will cause a resource 
leak.
+ *
+ * - If the object is retained multiple times,
+ *   it must be released the same number of times.
+ *
+ * - If the object has been retained and then completely released (i.e. the 
reference count becomes 0),
+ *   it must not be retained/released/accessed anymore since it may have been 
allocated for other use.
+ *
+ * @param <T> The object type.
+ */
+public interface ReferenceCountedObject<T> {
+  /** @return the object. */
+  T get();
+
+  /**
+   * Retain the object for later use.
+   * The reference count will be increased by 1.
+   *
+   * The {@link #release()} method must be invoked afterward.
+   * Otherwise, the object is not returned, and it will cause a resource leak.
+   *
+   * @return the object.
+   */
+  T retain();
+
+  /**
+   * Release the object.
+   * The reference count will be decreased by 1.
+   *
+   * @return true if the object is completely released (i.e. reference count 
becomes 0); otherwise, return false.
+   */
+  boolean release();
+
+  /**
+   * Wrap the given value as a {@link ReferenceCountedObject}.
+   *
+   * @param value the value being wrapped.
+   * @param retainMethod a method to run when {@link #retain()} is invoked.
+   * @param releaseMethod a method to run when {@link #release()} is invoked.
+   * @param <V> The value type.
+   * @return the wrapped reference-counted object.
+   */
+  static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, 
Runnable releaseMethod) {
+    Objects.requireNonNull(value, "value == null");
+    Objects.requireNonNull(retainMethod, "retainMethod == null");
+    Objects.requireNonNull(releaseMethod, "releaseMethod == null");
+
+    return new ReferenceCountedObject<V>() {
+      private final AtomicInteger count = new AtomicInteger();
+
+      @Override
+      public V get() {
+        if (count.get() < 0) {
+          throw new IllegalStateException("Failed to get: object has already 
been completely released.");
+        }
+        return value;
+      }
+
+      @Override
+      public V retain() {
+        // n <  0: exception
+        // n >= 0: n++
+        if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
+          throw new IllegalStateException("Failed to retain: object has 
already been completely released.");
+        }
+
+        retainMethod.run();
+        return value;
+      }
+
+      @Override
+      public boolean release() {
+        // n <= 0: exception
+        // n >  1: n--
+        // n == 1: n = -1
+        final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
+        if (previous < 0) {
+          throw new IllegalStateException("Failed to release: object has 
already been completely released.");
+        } else if (previous == 0) {
+          throw new IllegalStateException("Failed to release: object has not 
yet been retained.");
+        }
+        releaseMethod.run();
+        return previous == 1;
+      }
+    };
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 301a7d9..81c3aa1 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -56,6 +56,7 @@ import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.function.CheckedBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -279,9 +280,7 @@ public class DataStreamManagement {
 
   static <T> CompletableFuture<T> 
composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor,
       Function<T, CompletableFuture<T>> function) {
-    final CompletableFuture<T> composed = 
future.get().thenComposeAsync(function, executor);
-    future.set(composed);
-    return composed;
+    return future.updateAndGet(previous -> previous.thenComposeAsync(function, 
executor));
   }
 
   static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[] 
options, DataStream stream,
@@ -294,8 +293,9 @@ public class DataStreamManagement {
     final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
+      final ReferenceCountedObject<ByteBuffer> wrapped = 
ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
       try {
-        byteWritten += channel.write(buffer);
+        byteWritten += channel.write(wrapped);
       } catch (Throwable t) {
         throw new CompletionException(t);
       }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 772cc71..3eae341 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -35,11 +35,13 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -251,6 +253,40 @@ public interface StateMachine extends Closeable {
    */
   interface DataChannel extends WritableByteChannel {
     /**
+     * This method is the same as {@link 
WritableByteChannel#write(ByteBuffer)}.
+     *
+     * If the implementation has overridden {@link 
#write(ReferenceCountedObject)},
+     * then it does not have to override this method.
+     */
+    @Override
+    default int write(ByteBuffer buffer) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Similar to {@link #write(ByteBuffer)}
+     * except that the parameter is a {@link ReferenceCountedObject}.
+     *
+     * This is an optional method.
+     * The default implementation is the same as 
write(referenceCountedBuffer.get()).
+     *
+     * The implementation may choose to override this method in order to 
retain the buffer for later use.
+     *
+     * - If the buffer is retained, it must be released afterward.
+     *   Otherwise, the buffer will not be returned, and it will cause a 
memory leak.
+     *
+     * - If the buffer is retained multiple times, it must be released the 
same number of time.
+     *
+     * - It is safe to access the buffer before this method returns with or 
without retaining it.
+     *
+     * - If the buffer is not retained but is accessed after this method 
returns,
+     *   the content of the buffer could possibly be changed unexpectedly, and 
it will cause data corruption.
+     */
+    default int write(ReferenceCountedObject<ByteBuffer> 
referenceCountedBuffer) throws IOException {
+      return write(referenceCountedBuffer.get());
+    }
+
+    /**
      * Similar to {@link java.nio.channels.FileChannel#force(boolean)},
      * the underlying implementation should force writing the data and/or 
metadata to the underlying storage.
      *
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
 
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
new file mode 100644
index 0000000..4482121
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestReferenceCountedObject {
+  static void assertValues(
+      AtomicInteger retained, int expectedRetained,
+      AtomicInteger released, int expectedReleased) {
+    Assert.assertEquals("retained", expectedRetained, retained.get());
+    Assert.assertEquals("released", expectedReleased, released.get());
+  }
+
+  static void assertRelease(ReferenceCountedObject<?> ref,
+      AtomicInteger retained, int expectedRetained,
+      AtomicInteger released, int expectedReleased) {
+    final boolean returned = ref.release();
+    assertValues(retained, expectedRetained, released, expectedReleased);
+    Assert.assertEquals(expectedRetained == expectedReleased, returned);
+  }
+
+  @Test(timeout = 1000)
+  public void testWrap() {
+    final String value = "testWrap";
+    final AtomicInteger retained = new AtomicInteger();
+    final AtomicInteger released = new AtomicInteger();
+    final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap(
+        value, retained::getAndIncrement, released::getAndIncrement);
+
+    assertValues(retained, 0, released, 0);
+    Assert.assertEquals(value, ref.get());
+    assertValues(retained, 0, released, 0);
+
+    Assert.assertEquals(value, ref.retain());
+    assertValues(retained, 1, released, 0);
+
+    Assert.assertEquals(value, ref.retain());
+    assertValues(retained, 2, released, 0);
+
+    assertRelease(ref, retained, 2, released, 1);
+
+    Assert.assertEquals(value, ref.retain());
+    assertValues(retained, 3, released, 1);
+
+    assertRelease(ref, retained, 3, released, 2);
+
+    assertRelease(ref, retained, 3, released, 3);
+
+    try {
+      ref.get();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+
+    try {
+      ref.retain();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+
+    try {
+      ref.release();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+  }
+
+  @Test(timeout = 1000)
+  public void testReleaseWithoutRetaining() {
+    final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("", 
() -> {}, () -> {});
+
+    try {
+      ref.release();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+
+    try {
+      ref.get();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+
+    try {
+      ref.retain();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
+  }
+}

Reply via email to