hangc0276 commented on a change in pull request #2932: URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r826054869
########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java ########## @@ -0,0 +1,508 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX; +import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +import org.apache.bookkeeper.bookie.AbstractLogCompactor; +import org.apache.bookkeeper.bookie.Bookie.NoEntryException; +import org.apache.bookkeeper.bookie.EntryLogMetadata; +import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; +import org.apache.bookkeeper.bookie.storage.EntryLogIds; +import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLoggerIface; +import org.apache.bookkeeper.common.util.nativeio.NativeIO; +import org.apache.bookkeeper.slogger.Slogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * DirectEntryLogger. + */ +public class DirectEntryLogger implements EntryLoggerIface { + private static final String LOGFILE_SUFFIX = ".log"; + private final Slogger slog; + private final File ledgerDir; + private final EntryLogIds ids; + private final ExecutorService writeExecutor; + private final ExecutorService flushExecutor; + private final long maxFileSize; + private final DirectEntryLoggerStats stats; + private final ByteBufAllocator allocator; + private final BufferPool writeBuffers; + private final int readBufferSize; + private final int maxSaneEntrySize; + private final Set<Integer> unflushedLogs; + + private WriterWithMetadata curWriter; + + private List<Future<?>> pendingFlushes; + private final NativeIO nativeIO; + private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>(); + private final ThreadLocal<Cache<Integer, LogReader>> caches; + + private static final int NUMBER_OF_WRITE_BUFFERS = 8; + + public DirectEntryLogger(File ledgerDir, + EntryLogIds ids, + NativeIO nativeIO, + ByteBufAllocator allocator, + ExecutorService writeExecutor, + ExecutorService flushExecutor, + long maxFileSize, + int maxSaneEntrySize, + long totalWriteBufferSize, + long totalReadBufferSize, + int readBufferSize, + int numReadThreads, + int maxFdCacheTimeSeconds, + Slogger slogParent, + StatsLogger stats) throws IOException { + this.ledgerDir = ledgerDir; + this.flushExecutor = flushExecutor; + this.writeExecutor = writeExecutor; + this.pendingFlushes = new ArrayList<>(); + this.nativeIO = nativeIO; + this.unflushedLogs = ConcurrentHashMap.newKeySet(); + + this.maxFileSize = maxFileSize; + this.maxSaneEntrySize = maxSaneEntrySize; + this.readBufferSize = Buffer.nextAlignment(readBufferSize); + this.ids = ids; + this.slog = slogParent.kv("directory", ledgerDir).ctx(); + + this.stats = new DirectEntryLoggerStats(stats); + + this.allocator = allocator; + + int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS)); + this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS); + + // The total read buffer memory needs to get split across all the read threads, since the caches + // are thread-specific and we want to ensure we don't pass the total memory limit. + long perThreadBufferSize = totalReadBufferSize / numReadThreads; + + // if the amount of total read buffer size is too low, and/or the number of read threads is too high + // then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers + // from the cache + if (perThreadBufferSize < readBufferSize) { + slog.kv("reason", "perThreadBufferSize lower than readBufferSize (causes immediate reader cache eviction)") + .kv("totalReadBufferSize", totalReadBufferSize) + .kv("totalNumReadThreads", numReadThreads) + .kv("readBufferSize", readBufferSize) + .kv("perThreadBufferSize", perThreadBufferSize) + .error(Events.ENTRYLOGGER_MISCONFIGURED); + } + + long maxCachedReadersPerThread = perThreadBufferSize / readBufferSize; + long maxCachedReaders = maxCachedReadersPerThread * numReadThreads; + + this.slog + .kv("maxFileSize", maxFileSize) + .kv("maxSaneEntrySize", maxSaneEntrySize) + .kv("totalWriteBufferSize", totalWriteBufferSize) + .kv("singleWriteBufferSize", singleWriteBufferSize) + .kv("totalReadBufferSize", totalReadBufferSize) + .kv("readBufferSize", readBufferSize) + .kv("perThreadBufferSize", perThreadBufferSize) + .kv("maxCachedReadersPerThread", maxCachedReadersPerThread) + .kv("maxCachedReaders", maxCachedReaders) + .info(Events.ENTRYLOGGER_CREATED); + + this.caches = ThreadLocal.withInitial(() -> { + RemovalListener<Integer, LogReader> rl = (notification) -> { + try { + notification.getValue().close(); + this.stats.getCloseReaderCounter().inc(); + } catch (IOException ioe) { + slog.kv("logID", notification.getKey()).error(Events.READER_CLOSE_ERROR); + } + }; + Cache<Integer, LogReader> cache = CacheBuilder.newBuilder() + .maximumWeight(perThreadBufferSize) + .weigher((key, value) -> readBufferSize) + .removalListener(rl) + .expireAfterAccess(maxFdCacheTimeSeconds, TimeUnit.SECONDS) + .concurrencyLevel(1) // important to avoid too aggressive eviction + .build(); + allCaches.add(cache); + return cache; + }); + } + + @Override + public long addEntry(long ledgerId, ByteBuf buf) throws IOException { + long start = System.nanoTime(); + int size = buf.readableBytes(); Review comment: The size doesn't ever used, remove it? ########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectWriter.java ########## @@ -0,0 +1,318 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.bookkeeper.common.util.nativeio.NativeIO; +import org.apache.bookkeeper.common.util.nativeio.NativeIOException; +import org.apache.bookkeeper.slogger.Slogger; +import org.apache.commons.lang3.SystemUtils; + +class DirectWriter implements LogWriter { + final NativeIO nativeIO; + final int fd; + final int id; + final String filename; + final BufferPool bufferPool; + final ExecutorService writeExecutor; + final Object bufferLock = new Object(); + final List<Future<?>> outstandingWrites = new ArrayList<Future<?>>(); + Buffer nativeBuffer; + long offset; + private static volatile boolean useFallocate = true; + + DirectWriter(int id, + String filename, + long maxFileSize, + ExecutorService writeExecutor, + BufferPool bufferPool, + NativeIO nativeIO, Slogger slog) throws IOException { + checkArgument(maxFileSize > 0, "Max file size (%d) must be positive"); + this.id = id; + this.filename = filename; + this.writeExecutor = writeExecutor; + this.nativeIO = nativeIO; + + offset = 0; + + try { + fd = nativeIO.open(filename, + NativeIO.O_CREAT | NativeIO.O_WRONLY | NativeIO.O_DIRECT, + 00755); + checkState(fd >= 0, "Open should have thrown exception, fd is invalid : %d", fd); + } catch (NativeIOException ne) { + throw new IOException(exMsg(ne.getMessage()).kv("file", filename) + .kv("errno", ne.getErrno()).toString(), ne); + } + + if (useFallocate) { + if (!SystemUtils.IS_OS_LINUX) { + useFallocate = false; + slog.warn(Events.FALLOCATE_NOT_AVAILABLE); + } else { + try { + int ret = nativeIO.fallocate(fd, NativeIO.FALLOC_FL_ZERO_RANGE, 0, maxFileSize); + checkState(ret == 0, "Exception should have been thrown on non-zero ret: %d", ret); + } catch (NativeIOException ex) { + // fallocate(2) is not supported on all filesystems. Since this is an optimization, disable + // subsequent usage instead of failing the operation. + useFallocate = false; + slog.kv("message", ex.getMessage()) + .kv("file", filename) + .kv("errno", ex.getErrno()) + .warn(Events.FALLOCATE_NOT_AVAILABLE); + } + } + } + + this.bufferPool = bufferPool; + this.nativeBuffer = bufferPool.acquire(); + } + + @Override + public int logId() { + return id; + } + + @Override + public void writeAt(long offset, ByteBuf buf) throws IOException { + checkArgument(Buffer.isAligned(offset), + "Offset to writeAt must be aligned to %d: %d is not", Buffer.ALIGNMENT, offset); + checkArgument(Buffer.isAligned(buf.readableBytes()), + "Buffer must write multiple of alignment bytes (%d), %d is not", + Buffer.ALIGNMENT, buf.readableBytes()); + Buffer tmpBuffer = bufferPool.acquire(); + int bytesToWrite = buf.readableBytes(); + tmpBuffer.reset(); + tmpBuffer.writeByteBuf(buf); + Future<?> f = writeExecutor.submit(() -> { + try { + int ret = nativeIO.pwrite(fd, tmpBuffer.pointer(), bytesToWrite, offset); + if (ret != bytesToWrite) { + throw new IOException(exMsg("Incomplete write") + .kv("filename", filename) + .kv("writeSize", bytesToWrite) + .kv("bytesWritten", ret) + .kv("offset", offset).toString()); + } + } catch (NativeIOException ne) { + throw new IOException(exMsg("Write error") + .kv("filename", filename) + .kv("writeSize", bytesToWrite) + .kv("errno", ne.getErrno()) + .kv("offset", offset).toString()); + } finally { + bufferPool.release(tmpBuffer); + } + return null; + }); + addOutstandingWrite(f); + } + + @Override + public int writeDelimited(ByteBuf buf) throws IOException { + synchronized (bufferLock) { + if (!nativeBuffer.hasSpace(serializedSize(buf))) { + flushBuffer(); + } + + int readable = buf.readableBytes(); + long bufferPosition = position() + Integer.BYTES; + if (bufferPosition > Integer.MAX_VALUE) { + throw new IOException(exMsg("Cannot write past max int") + .kv("filename", filename) + .kv("writeSize", readable) + .kv("position", bufferPosition) + .toString()); + } + nativeBuffer.writeInt(readable); + nativeBuffer.writeByteBuf(buf); + return (int) bufferPosition; + } + } + + @Override + public void position(long offset) throws IOException { + synchronized (bufferLock) { + if (nativeBuffer != null && nativeBuffer.position() > 0) { + flushBuffer(); + } + if ((offset % Buffer.ALIGNMENT) != 0) { + throw new IOException(exMsg("offset must be multiple of alignment") + .kv("offset", offset) + .kv("alignment", Buffer.ALIGNMENT) + .toString()); + } + this.offset = offset; + } + } + + @Override + public long position() { + synchronized (bufferLock) { + return this.offset + (nativeBuffer != null ? nativeBuffer.position() : 0); + } + } + + @Override + public void flush() throws IOException { + flushBuffer(); + + waitForOutstandingWrites(); + + try { + int ret = nativeIO.fsync(fd); + checkState(ret == 0, "Fsync should throw exception on non-zero return (%d)", ret); + } catch (NativeIOException ne) { + throw new IOException(exMsg(ne.getMessage()) + .kv("file", filename) + .kv("errno", ne.getErrno()).toString()); + } + } + + @Override + public void close() throws IOException { + synchronized (bufferLock) { + if (nativeBuffer != null && nativeBuffer.position() > 0) { + flushBuffer(); + } + } + + try { + int ret = nativeIO.close(fd); + checkState(ret == 0, "Close should throw exception on non-zero return (%d)", ret); + } catch (NativeIOException ne) { + throw new IOException(exMsg(ne.getMessage()) + .kv("file", filename) + .kv("errno", ne.getErrno()).toString()); + } + synchronized (bufferLock) { + bufferPool.release(nativeBuffer); + nativeBuffer = null; + } + } + + private void addOutstandingWrite(Future<?> toAdd) throws IOException { + synchronized (outstandingWrites) { + outstandingWrites.add(toAdd); + + Iterator<Future<?>> iter = outstandingWrites.iterator(); + while (iter.hasNext()) { // clear out completed futures + Future<?> f = iter.next(); + if (f.isDone()) { + waitForFuture(f); + iter.remove(); + } else { + break; + } + } + } + } + + private void waitForOutstandingWrites() throws IOException { + synchronized (outstandingWrites) { + Iterator<Future<?>> iter = outstandingWrites.iterator(); + while (iter.hasNext()) { // clear out completed futures + Future<?> f = iter.next(); + waitForFuture(f); + iter.remove(); + } + } + } + + private void waitForFuture(Future<?> f) throws IOException { + try { + f.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException(ie); + } catch (Throwable t) { + if (t.getCause() instanceof IOException) { + throw (IOException) t.getCause(); + } else { + throw new IOException(t); + } + } + } + + private void flushBuffer() throws IOException { + synchronized (bufferLock) { + Buffer bufferToFlush = nativeBuffer; + this.nativeBuffer = null; + if (bufferToFlush != null) { + int bytesToWrite = bufferToFlush.padToAlignment(); + long offsetToWrite = offset; + offset += bytesToWrite; + Future<?> f = writeExecutor.submit(() -> { + if (bufferToFlush == null) { Review comment: Do we need this double check? ########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java ########## @@ -0,0 +1,508 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX; +import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +import org.apache.bookkeeper.bookie.AbstractLogCompactor; +import org.apache.bookkeeper.bookie.Bookie.NoEntryException; +import org.apache.bookkeeper.bookie.EntryLogMetadata; +import org.apache.bookkeeper.bookie.storage.CompactionEntryLog; +import org.apache.bookkeeper.bookie.storage.EntryLogIds; +import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl; +import org.apache.bookkeeper.bookie.storage.EntryLogScanner; +import org.apache.bookkeeper.bookie.storage.EntryLoggerIface; +import org.apache.bookkeeper.common.util.nativeio.NativeIO; +import org.apache.bookkeeper.slogger.Slogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * DirectEntryLogger. + */ +public class DirectEntryLogger implements EntryLoggerIface { + private static final String LOGFILE_SUFFIX = ".log"; + private final Slogger slog; + private final File ledgerDir; + private final EntryLogIds ids; + private final ExecutorService writeExecutor; + private final ExecutorService flushExecutor; + private final long maxFileSize; + private final DirectEntryLoggerStats stats; + private final ByteBufAllocator allocator; + private final BufferPool writeBuffers; + private final int readBufferSize; + private final int maxSaneEntrySize; + private final Set<Integer> unflushedLogs; + + private WriterWithMetadata curWriter; + + private List<Future<?>> pendingFlushes; + private final NativeIO nativeIO; + private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>(); + private final ThreadLocal<Cache<Integer, LogReader>> caches; + + private static final int NUMBER_OF_WRITE_BUFFERS = 8; + + public DirectEntryLogger(File ledgerDir, + EntryLogIds ids, + NativeIO nativeIO, + ByteBufAllocator allocator, + ExecutorService writeExecutor, + ExecutorService flushExecutor, + long maxFileSize, + int maxSaneEntrySize, + long totalWriteBufferSize, + long totalReadBufferSize, + int readBufferSize, + int numReadThreads, + int maxFdCacheTimeSeconds, + Slogger slogParent, + StatsLogger stats) throws IOException { + this.ledgerDir = ledgerDir; + this.flushExecutor = flushExecutor; + this.writeExecutor = writeExecutor; + this.pendingFlushes = new ArrayList<>(); + this.nativeIO = nativeIO; + this.unflushedLogs = ConcurrentHashMap.newKeySet(); + + this.maxFileSize = maxFileSize; + this.maxSaneEntrySize = maxSaneEntrySize; + this.readBufferSize = Buffer.nextAlignment(readBufferSize); + this.ids = ids; + this.slog = slogParent.kv("directory", ledgerDir).ctx(); + + this.stats = new DirectEntryLoggerStats(stats); + + this.allocator = allocator; + + int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS)); + this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS); + + // The total read buffer memory needs to get split across all the read threads, since the caches + // are thread-specific and we want to ensure we don't pass the total memory limit. + long perThreadBufferSize = totalReadBufferSize / numReadThreads; + + // if the amount of total read buffer size is too low, and/or the number of read threads is too high + // then the perThreadBufferSize can be lower than the readBufferSize causing immediate eviction of readers + // from the cache + if (perThreadBufferSize < readBufferSize) { Review comment: use aligned `this.readBufferSize` instead of original `readBufferSize`? ########## File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java ########## @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.directentrylogger; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.bookkeeper.common.util.nativeio.NativeIO; + +/** + * BufferPool. + */ +public class BufferPool implements AutoCloseable { + private final int maxPoolSize; Review comment: The field `maxPoolSize` not ever used, could we remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
