lhotari commented on code in PR #4196: URL: https://github.com/apache/bookkeeper/pull/4196#discussion_r1478245486
########## bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java: ########## @@ -0,0 +1,1132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ByteProcessor; +import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +/** + * This class visits the possible wrapped child buffers of a Netty {@link ByteBuf} for a given offset and length. + * <p> + * The Netty ByteBuf API does not provide a method to visit the wrapped child buffers. The + * {@link ByteBuf#unwrap()} method is not suitable for this purpose as it loses the + * {@link ByteBuf#readerIndex()} state, resulting in incorrect offset and length information. + * <p> + * Despite Netty not having a public API for visiting the sub buffers, it is possible to achieve this using + * the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} method. This class uses this method to visit the + * wrapped child buffers by providing a suitable {@link ByteBuf} implementation. This implementation supports + * the role of the destination buffer for the getBytes call. It requires implementing the + * {@link ByteBuf#setBytes(int, ByteBuf, int, int)} and {@link ByteBuf#setBytes(int, byte[], int, int)} methods + * and other methods required by getBytes such as {@link ByteBuf#hasArray()}, {@link ByteBuf#hasMemoryAddress()}, + * {@link ByteBuf#nioBufferCount()} and {@link ByteBuf#capacity()}. + * All other methods in the internal ByteBuf implementation are not supported and will throw an exception. + * This is to ensure correctness and to fail fast if some ByteBuf implementation is not following the expected + * and supported interface contract. + */ +public class ByteBufVisitor { + private static final int DEFAULT_VISIT_MAX_DEPTH = 10; + + private ByteBufVisitor() { + // prevent instantiation + } + + /** + * This method traverses the potential nested composite buffers of the provided buffer, given a specific offset and + * length. The traversal continues until it encounters a buffer that is backed by an array or a memory address, + * which allows for the inspection of individual buffer segments without the need for data duplication. + * If no such wrapped buffer is found, the callback function is invoked with the original buffer, offset, + * and length as parameters. + * + * @param buffer the buffer to visit + * @param offset the offset for the buffer + * @param length the length for the buffer + * @param callback the callback to call for each visited buffer + * @param context the context to pass to the callback + */ + public static <T> void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback<T> callback, + T context) { + visitBuffers(buffer, offset, length, callback, context, DEFAULT_VISIT_MAX_DEPTH); + } + + /** + * The callback interface for visiting buffers. + * In case of a heap buffer that is backed by an byte[] array, the visitArray method is called. This + * is due to the internal implementation detail of the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} + * method for heap buffers. + */ + public interface ByteBufVisitorCallback<T> { + void visitBuffer(T context, ByteBuf visitBuffer, int visitIndex, int visitLength); + void visitArray(T context, byte[] visitArray, int visitIndex, int visitLength); + default boolean preferArrayOrMemoryAddress(T context) { + return true; + } + default boolean acceptsMemoryAddress(T context) { + return false; + } + } + + /** + * See @{@link #visitBuffers(ByteBuf, int, int, ByteBufVisitorCallback, Object)}. This method + * allows to specify the maximum depth of recursion for visiting wrapped buffers. + */ + public static <T> void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback<T> callback, + T context, int maxDepth) { + if (length == 0) { + // skip visiting empty buffers + return; + } + InternalContext<T> internalContext = new InternalContext<>(); + internalContext.maxDepth = maxDepth; + internalContext.callbackContext = context; + internalContext.callback = callback; + internalContext.recursivelyVisitBuffers(buffer, offset, length); + } + + private static final int TL_COPY_BUFFER_SIZE = 64 * 1024; + private static final FastThreadLocal<byte[]> TL_COPY_BUFFER = new FastThreadLocal<byte[]>() { + @Override + protected byte[] initialValue() { + return new byte[TL_COPY_BUFFER_SIZE]; + } + }; + + private static class InternalContext<T> { + int depth; + int maxDepth; + ByteBuf parentBuffer; + int parentOffset; + int parentLength; + T callbackContext; + ByteBufVisitorCallback<T> callback; + GetBytesCallbackByteBuf<T> callbackByteBuf = new GetBytesCallbackByteBuf(this); + + void recursivelyVisitBuffers(ByteBuf visitBuffer, int visitIndex, int visitLength) { + // visit the wrapped buffers recursively if the buffer is not backed by an array or memory address + // and the max depth has not been reached + if (depth < maxDepth && !visitBuffer.hasMemoryAddress() && !visitBuffer.hasArray()) { + parentBuffer = visitBuffer; + parentOffset = visitIndex; + parentLength = visitLength; + depth++; + // call getBytes to trigger the wrapped buffer visit + visitBuffer.getBytes(visitIndex, callbackByteBuf, 0, visitLength); + depth--; + } else { + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } + } + + void handleBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty buffers + return; + } + if (visitBuffer == parentBuffer && visitIndex == parentOffset && visitLength == parentLength) { + // further recursion would cause unnecessary recursion up to the max depth of recursion + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } else { + // use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively + recursivelyVisitBuffers(visitBuffer, visitIndex, visitLength); + } + } + + private void passBufferToCallback(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (callback.preferArrayOrMemoryAddress(callbackContext)) { + if (visitBuffer.hasArray()) { + handleArray(visitBuffer.array(), visitBuffer.arrayOffset() + visitIndex, visitLength); + } else if (visitBuffer.hasMemoryAddress() && callback.acceptsMemoryAddress(callbackContext)) { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } else if (callback.acceptsMemoryAddress(callbackContext) && visitBuffer.isDirect() + && visitBuffer.alloc().isDirectBufferPooled()) { + // read-only buffers need to be copied before they can be directly accessed + ByteBuf copyBuffer = visitBuffer.copy(visitIndex, visitLength); Review Comment: When it gets here, the source buffer (`visitBuffer`) will need to be copied in any case. Since this checks that the source buffer is a direct buffer and that the callback accepts a memory address buffer (direct buffer), it's better to make a direct copy than to use the thread local copy buffer (byte[]) for copying. The last resort is copying the source buffer using the byte[] copying. Netty won't allow visiting the backing direct buffer or backing array when the buffer is a read-only buffer. In all other cases, the ByteBufVisitor is able to visit all source buffers without copies. This is the most efficient solution that there is available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
