This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6742a4e RATIS-1556. Support reference-counted buffer in
StateMachine.DataChannel (#626)
6742a4e is described below
commit 6742a4eff7430d41807dcdecb7d0254142ccb974
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Mar 23 15:45:45 2022 +0800
RATIS-1556. Support reference-counted buffer in StateMachine.DataChannel
(#626)
---
.../apache/ratis/util/ReferenceCountedObject.java | 118 +++++++++++++++++++++
.../ratis/netty/server/DataStreamManagement.java | 8 +-
.../apache/ratis/statemachine/StateMachine.java | 36 +++++++
.../ratis/util/TestReferenceCountedObject.java | 115 ++++++++++++++++++++
4 files changed, 273 insertions(+), 4 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
new file mode 100644
index 0000000..8b0c859
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A reference-counted object can be retained for later use
+ * and then be released for returning the resource.
+ *
+ * - When the object is retained, the reference count is incremented by 1.
+ *
+ * - When the object is released, the reference count is decremented by 1.
+ *
+ * - If the object is retained, it must be released afterward.
+ * Otherwise, the object will not be returned, and it will cause a resource
leak.
+ *
+ * - If the object is retained multiple times,
+ * it must be released the same number of times.
+ *
+ * - If the object has been retained and then completely released (i.e. the
reference count becomes 0),
+ * it must not be retained/released/accessed anymore since it may have been
allocated for other use.
+ *
+ * @param <T> The object type.
+ */
+public interface ReferenceCountedObject<T> {
+ /** @return the object. */
+ T get();
+
+ /**
+ * Retain the object for later use.
+ * The reference count will be increased by 1.
+ *
+ * The {@link #release()} method must be invoked afterward.
+ * Otherwise, the object is not returned, and it will cause a resource leak.
+ *
+ * @return the object.
+ */
+ T retain();
+
+ /**
+ * Release the object.
+ * The reference count will be decreased by 1.
+ *
+ * @return true if the object is completely released (i.e. reference count
becomes 0); otherwise, return false.
+ */
+ boolean release();
+
+ /**
+ * Wrap the given value as a {@link ReferenceCountedObject}.
+ *
+ * @param value the value being wrapped.
+ * @param retainMethod a method to run when {@link #retain()} is invoked.
+ * @param releaseMethod a method to run when {@link #release()} is invoked.
+ * @param <V> The value type.
+ * @return the wrapped reference-counted object.
+ */
+ static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod,
Runnable releaseMethod) {
+ Objects.requireNonNull(value, "value == null");
+ Objects.requireNonNull(retainMethod, "retainMethod == null");
+ Objects.requireNonNull(releaseMethod, "releaseMethod == null");
+
+ return new ReferenceCountedObject<V>() {
+ private final AtomicInteger count = new AtomicInteger();
+
+ @Override
+ public V get() {
+ if (count.get() < 0) {
+ throw new IllegalStateException("Failed to get: object has already
been completely released.");
+ }
+ return value;
+ }
+
+ @Override
+ public V retain() {
+ // n < 0: exception
+ // n >= 0: n++
+ if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
+ throw new IllegalStateException("Failed to retain: object has
already been completely released.");
+ }
+
+ retainMethod.run();
+ return value;
+ }
+
+ @Override
+ public boolean release() {
+ // n <= 0: exception
+ // n > 1: n--
+ // n == 1: n = -1
+ final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
+ if (previous < 0) {
+ throw new IllegalStateException("Failed to release: object has
already been completely released.");
+ } else if (previous == 0) {
+ throw new IllegalStateException("Failed to release: object has not
yet been retained.");
+ }
+ releaseMethod.run();
+ return previous == 1;
+ }
+ };
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 301a7d9..81c3aa1 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -56,6 +56,7 @@ import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,9 +280,7 @@ public class DataStreamManagement {
static <T> CompletableFuture<T>
composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor,
Function<T, CompletableFuture<T>> function) {
- final CompletableFuture<T> composed =
future.get().thenComposeAsync(function, executor);
- future.set(composed);
- return composed;
+ return future.updateAndGet(previous -> previous.thenComposeAsync(function,
executor));
}
static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[]
options, DataStream stream,
@@ -294,8 +293,9 @@ public class DataStreamManagement {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
+ final ReferenceCountedObject<ByteBuffer> wrapped =
ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
try {
- byteWritten += channel.write(buffer);
+ byteWritten += channel.write(wrapped);
} catch (Throwable t) {
throw new CompletionException(t);
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 772cc71..3eae341 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -35,11 +35,13 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -251,6 +253,40 @@ public interface StateMachine extends Closeable {
*/
interface DataChannel extends WritableByteChannel {
/**
+ * This method is the same as {@link
WritableByteChannel#write(ByteBuffer)}.
+ *
+ * If the implementation has overridden {@link
#write(ReferenceCountedObject)},
+ * then it does not have to override this method.
+ */
+ @Override
+ default int write(ByteBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Similar to {@link #write(ByteBuffer)}
+ * except that the parameter is a {@link ReferenceCountedObject}.
+ *
+ * This is an optional method.
+ * The default implementation is the same as
write(referenceCountedBuffer.get()).
+ *
+ * The implementation may choose to override this method in order to
retain the buffer for later use.
+ *
+ * - If the buffer is retained, it must be released afterward.
+ * Otherwise, the buffer will not be returned, and it will cause a
memory leak.
+ *
+ * - If the buffer is retained multiple times, it must be released the
same number of time.
+ *
+ * - It is safe to access the buffer before this method returns with or
without retaining it.
+ *
+ * - If the buffer is not retained but is accessed after this method
returns,
+ * the content of the buffer could possibly be changed unexpectedly, and
it will cause data corruption.
+ */
+ default int write(ReferenceCountedObject<ByteBuffer>
referenceCountedBuffer) throws IOException {
+ return write(referenceCountedBuffer.get());
+ }
+
+ /**
* Similar to {@link java.nio.channels.FileChannel#force(boolean)},
* the underlying implementation should force writing the data and/or
metadata to the underlying storage.
*
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
new file mode 100644
index 0000000..4482121
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestReferenceCountedObject {
+ static void assertValues(
+ AtomicInteger retained, int expectedRetained,
+ AtomicInteger released, int expectedReleased) {
+ Assert.assertEquals("retained", expectedRetained, retained.get());
+ Assert.assertEquals("released", expectedReleased, released.get());
+ }
+
+ static void assertRelease(ReferenceCountedObject<?> ref,
+ AtomicInteger retained, int expectedRetained,
+ AtomicInteger released, int expectedReleased) {
+ final boolean returned = ref.release();
+ assertValues(retained, expectedRetained, released, expectedReleased);
+ Assert.assertEquals(expectedRetained == expectedReleased, returned);
+ }
+
+ @Test(timeout = 1000)
+ public void testWrap() {
+ final String value = "testWrap";
+ final AtomicInteger retained = new AtomicInteger();
+ final AtomicInteger released = new AtomicInteger();
+ final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap(
+ value, retained::getAndIncrement, released::getAndIncrement);
+
+ assertValues(retained, 0, released, 0);
+ Assert.assertEquals(value, ref.get());
+ assertValues(retained, 0, released, 0);
+
+ Assert.assertEquals(value, ref.retain());
+ assertValues(retained, 1, released, 0);
+
+ Assert.assertEquals(value, ref.retain());
+ assertValues(retained, 2, released, 0);
+
+ assertRelease(ref, retained, 2, released, 1);
+
+ Assert.assertEquals(value, ref.retain());
+ assertValues(retained, 3, released, 1);
+
+ assertRelease(ref, retained, 3, released, 2);
+
+ assertRelease(ref, retained, 3, released, 3);
+
+ try {
+ ref.get();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+
+ try {
+ ref.retain();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+
+ try {
+ ref.release();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ @Test(timeout = 1000)
+ public void testReleaseWithoutRetaining() {
+ final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("",
() -> {}, () -> {});
+
+ try {
+ ref.release();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+
+ try {
+ ref.get();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+
+ try {
+ ref.retain();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+}