hangc0276 commented on a change in pull request #2932: URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r826557707
########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java ########## @@ -0,0 +1,318 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.bookkeeper.common.util.nativeio.NativeIO; +import org.apache.bookkeeper.common.util.nativeio.NativeIOException; +import org.apache.bookkeeper.slogger.Slogger; +import org.apache.commons.lang3.SystemUtils; + +class DirectWriter implements LogWriter { + final NativeIO nativeIO; + final int fd; + final int id; + final String filename; + final BufferPool bufferPool; + final ExecutorService writeExecutor; + final Object bufferLock = new Object(); + final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>(); + Buffer nativeBuffer; + long offset; + private static volatile boolean useFallocate = true; + + DirectWriter(int id, + String filename, + long maxFileSize, + ExecutorService writeExecutor, + BufferPool bufferPool, + NativeIO nativeIO, Slogger slog) throws IOException { + checkArgument(maxFileSize > 0, "Max file size (%d) must be positive"); + this.id = id; + this.filename = filename; + this.writeExecutor = writeExecutor; + this.nativeIO = nativeIO; + + offset = 0; + + try { + fd = nativeIO.open(filename, + NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT, + 00755); + checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd); + } catch (NativeIOException ne) { + throw new IOException(exMsg(ne.getMessage()).kv("file", filename) + .kv("errno", ne.getErrno()).toString(), ne); + } + + if (useFallocate) { + if (!SystemUtils.IS_OS_LINUX) { + useFallocate = false; + slog.warn(Events.FALLOCATE_NOT_AVAILABLE); + } else { + try { + int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize); + checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret); + } catch (NativeIOException ex) { + // fallocate(2) is not supported on all filesystems. Since this is an optimization, disable + // subsequent usage instead of failing the operation. + useFallocate = false; + slog.kv("message", ex.getMessage()) + .kv("file", filename) + .kv("errno", ex.getErrno()) + .warn(Events.FALLOCATE_NOT_AVAILABLE); + } + } + } + + this.bufferPool = bufferPool; + this.nativeBuffer = bufferPool.acquire(); + } + + @Override + public int logId() { + return id; + } + + @Override + public void writeAt(long offset, ByteBuf buf) throws IOException { + checkArgument(Buffer.isAligned(offset), + "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset); + checkArgument(Buffer.isAligned(buf.readableBytes()), + "Buffer must write multiple of alignment bytes (%d), %d is not", + Buffer.ALIGNMENT, buf.readableBytes()); + Buffer tmpBuffer = bufferPool.acquire(); + int bytesToWrite = buf.readableBytes(); + tmpBuffer.reset(); + tmpBuffer.writeByteBuf(buf); + Future<?> f = writeExecutor.submit(() -> { + try { + int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset); + if (ret != bytesToWrite) { + throw new IOException(exMsg("Incomplete write") + .kv("filename", filename) + .kv("writeSize", bytesToWrite) + .kv("bytesWritten", ret) + .kv("offset", offset).toString()); + } + } catch (NativeIOException ne) { + throw new IOException(exMsg("Write error") + .kv("filename", filename) + .kv("writeSize", bytesToWrite) + .kv("errno", ne.getErrno()) + .kv("offset", offset).toString()); + } finally { + bufferPool.release(tmpBuffer); + } + return null; + }); + addOutstandingWrite(f); + } + + @Override + public int writeDelimited(ByteBuf buf) throws IOException { + synchronized (bufferLock) { + if (!nativeBuffer.hasSpace(serializedSize(buf))) { + flushBuffer(); Review comment: The flush operation will be triggered when the nativeBuffer has no remaining space. However, if our disk is SSD, it will have better performance for smaller IO than bigger IO. Our default nativeBuffer size is `0.125 * direct memory / number of ledger directories / 8`, and could be configured by specific parameter. We'd better to make it to be configured by `flushEntrylogBytes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
