mauricebarnum commented on a change in pull request #2932:
URL: https://github.com/apache/bookkeeper/pull/2932#discussion_r830247077



##########
File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
##########
@@ -0,0 +1,508 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+import org.apache.bookkeeper.bookie.AbstractLogCompactor;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogIds;
+import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLoggerIface;
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+import org.apache.bookkeeper.slogger.Slogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * DirectEntryLogger.
+ */
+public class DirectEntryLogger implements EntryLoggerIface {
+    private static final String LOGFILE_SUFFIX = ".log";
+    private final Slogger slog;
+    private final File ledgerDir;
+    private final EntryLogIds ids;
+    private final ExecutorService writeExecutor;
+    private final ExecutorService flushExecutor;
+    private final long maxFileSize;
+    private final DirectEntryLoggerStats stats;
+    private final ByteBufAllocator allocator;
+    private final BufferPool writeBuffers;
+    private final int readBufferSize;
+    private final int maxSaneEntrySize;
+    private final Set<Integer> unflushedLogs;
+
+    private WriterWithMetadata curWriter;
+
+    private List<Future<?>> pendingFlushes;
+    private final NativeIO nativeIO;
+    private final List<Cache<?, ?>> allCaches = new CopyOnWriteArrayList<>();
+    private final ThreadLocal<Cache<Integer, LogReader>> caches;
+
+    private static final int NUMBER_OF_WRITE_BUFFERS = 8;
+
+    public DirectEntryLogger(File ledgerDir,
+                             EntryLogIds ids,
+                             NativeIO nativeIO,
+                             ByteBufAllocator allocator,
+                             ExecutorService writeExecutor,
+                             ExecutorService flushExecutor,
+                             long maxFileSize,
+                             int maxSaneEntrySize,
+                             long totalWriteBufferSize,
+                             long totalReadBufferSize,
+                             int readBufferSize,
+                             int numReadThreads,
+                             int maxFdCacheTimeSeconds,
+                             Slogger slogParent,
+                             StatsLogger stats) throws IOException {
+        this.ledgerDir = ledgerDir;
+        this.flushExecutor = flushExecutor;
+        this.writeExecutor = writeExecutor;
+        this.pendingFlushes = new ArrayList<>();
+        this.nativeIO = nativeIO;
+        this.unflushedLogs = ConcurrentHashMap.newKeySet();
+
+        this.maxFileSize = maxFileSize;
+        this.maxSaneEntrySize = maxSaneEntrySize;
+        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
+        this.ids = ids;
+        this.slog = slogParent.kv("directory", ledgerDir).ctx();
+
+        this.stats = new DirectEntryLoggerStats(stats);
+
+        this.allocator = allocator;
+
+        int singleWriteBufferSize = Buffer.nextAlignment((int) 
(totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
+        this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, 
NUMBER_OF_WRITE_BUFFERS);
+
+        // The total read buffer memory needs to get split across all the read 
threads, since the caches
+        // are thread-specific and we want to ensure we don't pass the total 
memory limit.
+        long perThreadBufferSize = totalReadBufferSize / numReadThreads;
+
+        // if the amount of total read buffer size is too low, and/or the 
number of read threads is too high
+        // then the perThreadBufferSize can be lower than the readBufferSize 
causing immediate eviction of readers
+        // from the cache
+        if (perThreadBufferSize < readBufferSize) {

Review comment:
       ```diff
   diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
   index 41417df57..fab76d2af 100644
   --- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
   +++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
   @@ -98,7 +98,7 @@ public class DirectEntryLogger implements EntryLoggerIface 
{
                                 int maxSaneEntrySize,
                                 long totalWriteBufferSize,
                                 long totalReadBufferSize,
   -                             int readBufferSize,
   +                             int unalignedReadBufferSize,
                                 int numReadThreads,
                                 int maxFdCacheTimeSeconds,
                                 Slogger slogParent,
   @@ -112,7 +112,7 @@ public class DirectEntryLogger implements 
EntryLoggerIface {
   
            this.maxFileSize = maxFileSize;
            this.maxSaneEntrySize = maxSaneEntrySize;
   -        this.readBufferSize = Buffer.nextAlignment(readBufferSize);
   +        this.readBufferSize = Buffer.nextAlignment(unalignedReadBufferSize);
            this.ids = ids;
            this.slog = slogParent.kv("directory", ledgerDir).ctx();
   
   @@ -131,7 +131,7 @@ public class DirectEntryLogger implements 
EntryLoggerIface {
            // then the perThreadBufferSize can be lower than the 
readBufferSize causing immediate evicti
   on of readers
            // from the cache
            if (perThreadBufferSize < readBufferSize) {
   -            slog.kv("reason", "perThreadBufferSize lower than 
readBufferSize (causes immediate reader
    cache eviction)")
   +            slog.kv("reason", "perThreadBufferSize lower than aligned 
readBufferSize (causes immediat
   e reader cache eviction)")
                    .kv("totalReadBufferSize", totalReadBufferSize)
                    .kv("totalNumReadThreads", numReadThreads)
                    .kv("readBufferSize", readBufferSize)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to