This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit ba045c84a291619b49493d8b974c1e3bb073958a Author: Qiang Huang <[email protected]> AuthorDate: Sun Dec 4 19:32:11 2022 +0800 [refactor][bookkeeper] Refactor ByteBuf release method in stream/statelib (#3689) * refactor ByteBuf release usage (cherry picked from commit 324b8d43ab47f1a9bf681abaa8ab486c00d331a5) --- .../statelib/impl/journal/AbstractStateStoreWithJournal.java | 5 +++-- .../main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java | 3 ++- .../org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java | 5 +++-- .../java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java | 5 +++-- .../org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java | 3 ++- .../java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java | 3 ++- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java index a43fcf866c..600c9bc346 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.time.Duration; import java.util.concurrent.Callable; @@ -548,7 +549,7 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta long txId = ++nextRevision; return FutureUtils.ensure( writer.write(new LogRecord(txId, cmdBuf.nioBuffer())), - () -> cmdBuf.release()); + () -> ReferenceCountUtil.safeRelease(cmdBuf)); } protected synchronized CompletableFuture<Long> writeCommandBufReturnTxId(ByteBuf cmdBuf) { @@ -556,7 +557,7 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta return FutureUtils.ensure( writer.write(new LogRecord(txId, cmdBuf.nioBuffer())) .thenApply(dlsn -> txId), - () -> cmdBuf.release()); + () -> ReferenceCountUtil.safeRelease(cmdBuf)); } // diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java index 16d647edd0..aeac76d153 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java @@ -23,6 +23,7 @@ import com.google.protobuf.UnsafeByteOperations; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -76,7 +77,7 @@ final class KVUtils { try { cmd.writeTo(new ByteBufOutputStream(buf)); } catch (IOException e) { - buf.release(); + ReferenceCountUtil.safeRelease(buf); throw e; } return buf; diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java index 4252ddec6e..0bceee6a09 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.statelib.impl.kv; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -99,7 +100,7 @@ public class RocksdbKVAsyncStore<K, V> byte[] serializedBytes = ByteBufUtil.getBytes(serializedBuf); localStore.put(keyBytes, serializedBytes, revision); } finally { - serializedBuf.release(); + ReferenceCountUtil.safeRelease(serializedBuf); } return null; }, writeIOScheduler); @@ -125,7 +126,7 @@ public class RocksdbKVAsyncStore<K, V> return KVUtils.deserialize(valCoder, Unpooled.wrappedBuffer(prevValue)); } } finally { - serializedBuf.release(); + ReferenceCountUtil.safeRelease(serializedBuf); } }, writeIOScheduler); } diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java index b1cf0fe8fc..762adcf1e1 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.statelib.impl.mvcc; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; import java.util.function.Predicate; import lombok.Data; import lombok.Getter; @@ -87,7 +88,7 @@ public class MVCCRecord implements Recycled, Predicate<RangeOption<?>> { public void setValue(ByteBuf buf, ValueType valueType) { if (null != value) { - value.release(); + ReferenceCountUtil.safeRelease(value); } this.value = buf; this.valueType = valueType; @@ -98,7 +99,7 @@ public class MVCCRecord implements Recycled, Predicate<RangeOption<?>> { private void reset() { if (null != value) { - value.release(); + ReferenceCountUtil.safeRelease(value); value = null; } modRev = -1L; diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java index 826f92103b..11a5e1ad7a 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java @@ -22,6 +22,7 @@ import com.google.protobuf.CodedOutputStream; import com.google.protobuf.InvalidProtocolBufferException; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.nio.ByteBuffer; import lombok.AccessLevel; @@ -74,7 +75,7 @@ final class MVCCRecordCoder implements Coder<MVCCRecord> { buf.writerIndex(buf.writerIndex() + metaLen); buf.writeInt(valLen); buf.writeBytes(record.getValue().slice()); - buf.release(); + ReferenceCountUtil.safeRelease(buf); return data; } diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java index 74d862861a..9393541979 100644 --- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java +++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java @@ -30,6 +30,7 @@ import com.google.protobuf.TextFormat; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -490,7 +491,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> implements MVCCStore<K, V try { record = getKeyRecord(key, rawKey); } catch (StateStoreRuntimeException e) { - rawValBuf.release(); + ReferenceCountUtil.safeRelease(rawValBuf); throw e; }
