lhotari commented on code in PR #4196:
URL: https://github.com/apache/bookkeeper/pull/4196#discussion_r1478245486


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java:
##########
@@ -0,0 +1,1132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ByteProcessor;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+
+/**
+ * This class visits the possible wrapped child buffers of a Netty {@link 
ByteBuf} for a given offset and length.
+ * <p>
+ * The Netty ByteBuf API does not provide a method to visit the wrapped child 
buffers. The
+ * {@link ByteBuf#unwrap()} method is not suitable for this purpose as it 
loses the
+ * {@link ByteBuf#readerIndex()} state, resulting in incorrect offset and 
length information.
+ * <p>
+ * Despite Netty not having a public API for visiting the sub buffers, it is 
possible to achieve this using
+ * the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} method. This class 
uses this method to visit the
+ * wrapped child buffers by providing a suitable {@link ByteBuf} 
implementation. This implementation supports
+ * the role of the destination buffer for the getBytes call. It requires 
implementing the
+ * {@link ByteBuf#setBytes(int, ByteBuf, int, int)} and {@link 
ByteBuf#setBytes(int, byte[], int, int)} methods
+ * and other methods required by getBytes such as {@link ByteBuf#hasArray()}, 
{@link ByteBuf#hasMemoryAddress()},
+ * {@link ByteBuf#nioBufferCount()} and {@link ByteBuf#capacity()}.
+ * All other methods in the internal ByteBuf implementation are not supported 
and will throw an exception.
+ * This is to ensure correctness and to fail fast if some ByteBuf 
implementation is not following the expected
+ * and supported interface contract.
+ */
+public class ByteBufVisitor {
+    private static final int DEFAULT_VISIT_MAX_DEPTH = 10;
+
+    private ByteBufVisitor() {
+        // prevent instantiation
+    }
+
+    /**
+     * This method traverses the potential nested composite buffers of the 
provided buffer, given a specific offset and
+     * length. The traversal continues until it encounters a buffer that is 
backed by an array or a memory address,
+     * which allows for the inspection of individual buffer segments without 
the need for data duplication.
+     * If no such wrapped buffer is found, the callback function is invoked 
with the original buffer, offset,
+     * and length as parameters.
+     *
+     * @param buffer   the buffer to visit
+     * @param offset   the offset for the buffer
+     * @param length   the length for the buffer
+     * @param callback the callback to call for each visited buffer
+     * @param context  the context to pass to the callback
+     */
+    public static <T> void visitBuffers(ByteBuf buffer, int offset, int 
length, ByteBufVisitorCallback<T> callback,
+                                        T context) {
+        visitBuffers(buffer, offset, length, callback, context, 
DEFAULT_VISIT_MAX_DEPTH);
+    }
+
+    /**
+     * The callback interface for visiting buffers.
+     * In case of a heap buffer that is backed by an byte[] array, the 
visitArray method is called. This
+     * is due to the internal implementation detail of the {@link 
ByteBuf#getBytes(int, ByteBuf, int, int)}
+     * method for heap buffers.
+     */
+    public interface ByteBufVisitorCallback<T> {
+        void visitBuffer(T context, ByteBuf visitBuffer, int visitIndex, int 
visitLength);
+        void visitArray(T context, byte[] visitArray, int visitIndex, int 
visitLength);
+        default boolean preferArrayOrMemoryAddress(T context) {
+            return true;
+        }
+        default boolean acceptsMemoryAddress(T context) {
+            return false;
+        }
+    }
+
+    /**
+     * See @{@link #visitBuffers(ByteBuf, int, int, ByteBufVisitorCallback, 
Object)}. This method
+     * allows to specify the maximum depth of recursion for visiting wrapped 
buffers.
+     */
+    public static <T> void visitBuffers(ByteBuf buffer, int offset, int 
length, ByteBufVisitorCallback<T> callback,
+                                        T context, int maxDepth) {
+        if (length == 0) {
+            // skip visiting empty buffers
+            return;
+        }
+        InternalContext<T> internalContext = new InternalContext<>();
+        internalContext.maxDepth = maxDepth;
+        internalContext.callbackContext = context;
+        internalContext.callback = callback;
+        internalContext.recursivelyVisitBuffers(buffer, offset, length);
+    }
+
+    private static final int TL_COPY_BUFFER_SIZE = 64 * 1024;
+    private static final FastThreadLocal<byte[]> TL_COPY_BUFFER = new 
FastThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+            return new byte[TL_COPY_BUFFER_SIZE];
+        }
+    };
+
+    private static class InternalContext<T> {
+        int depth;
+        int maxDepth;
+        ByteBuf parentBuffer;
+        int parentOffset;
+        int parentLength;
+        T callbackContext;
+        ByteBufVisitorCallback<T> callback;
+        GetBytesCallbackByteBuf<T> callbackByteBuf = new 
GetBytesCallbackByteBuf(this);
+
+        void recursivelyVisitBuffers(ByteBuf visitBuffer, int visitIndex, int 
visitLength) {
+            // visit the wrapped buffers recursively if the buffer is not 
backed by an array or memory address
+            // and the max depth has not been reached
+            if (depth < maxDepth && !visitBuffer.hasMemoryAddress() && 
!visitBuffer.hasArray()) {
+                parentBuffer = visitBuffer;
+                parentOffset = visitIndex;
+                parentLength = visitLength;
+                depth++;
+                // call getBytes to trigger the wrapped buffer visit
+                visitBuffer.getBytes(visitIndex, callbackByteBuf, 0, 
visitLength);
+                depth--;
+            } else {
+                passBufferToCallback(visitBuffer, visitIndex, visitLength);
+            }
+        }
+
+        void handleBuffer(ByteBuf visitBuffer, int visitIndex, int 
visitLength) {
+            if (visitLength == 0) {
+                // skip visiting empty buffers
+                return;
+            }
+            if (visitBuffer == parentBuffer && visitIndex == parentOffset && 
visitLength == parentLength) {
+                // further recursion would cause unnecessary recursion up to 
the max depth of recursion
+                passBufferToCallback(visitBuffer, visitIndex, visitLength);
+            } else {
+                // use the doRecursivelyVisitBuffers method to visit the 
wrapped buffer, possibly recursively
+                recursivelyVisitBuffers(visitBuffer, visitIndex, visitLength);
+            }
+        }
+
+        private void passBufferToCallback(ByteBuf visitBuffer, int visitIndex, 
int visitLength) {
+            if (callback.preferArrayOrMemoryAddress(callbackContext)) {
+                if (visitBuffer.hasArray()) {
+                    handleArray(visitBuffer.array(), visitBuffer.arrayOffset() 
+ visitIndex, visitLength);
+                } else if (visitBuffer.hasMemoryAddress() && 
callback.acceptsMemoryAddress(callbackContext)) {
+                    callback.visitBuffer(callbackContext, visitBuffer, 
visitIndex, visitLength);
+                } else if (callback.acceptsMemoryAddress(callbackContext) && 
visitBuffer.isDirect()
+                        && visitBuffer.alloc().isDirectBufferPooled()) {
+                    // read-only buffers need to be copied before they can be 
directly accessed
+                    ByteBuf copyBuffer = visitBuffer.copy(visitIndex, 
visitLength);

Review Comment:
   When it gets here, the source buffer (`visitBuffer`) will need to be copied 
in any case.
   Since this checks that the source buffer is a direct buffer and that the 
callback accepts a memory address buffer (direct buffer), it's better to make a 
direct copy than to use the thread local copy buffer (byte[]) for copying.
   
   The last resort is copying the source buffer using the byte[] copying. 
   
   Netty won't allow visiting the backing direct buffer or backing array when 
the buffer is a read-only buffer. In all other cases, the ByteBufVisitor is 
able to visit all source buffers without copies. This is the most efficient 
solution that there is available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to