sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180942977
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##########
 @@ -499,176 +576,163 @@ void createNewLog() throws IOException {
     EntryLoggerAllocator getEntryLoggerAllocator() {
         return entryLoggerAllocator;
     }
-    /**
-     * Append the ledger map at the end of the entry log.
-     * Updates the entry log file header with the offset and size of the map.
-     */
-    private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws 
IOException {
-        long ledgerMapOffset = entryLogChannel.position();
-
-        ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-        int numberOfLedgers = (int) ledgersMap.size();
-
-        // Write the ledgers map into several batches
-
-        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-        try {
-            ledgersMap.forEach(new BiConsumerLong() {
-                int remainingLedgers = numberOfLedgers;
-                boolean startNewBatch = true;
-                int remainingInBatch = 0;
-
-                @Override
-                public void accept(long ledgerId, long size) {
-                    if (startNewBatch) {
-                        int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
-                        int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-                        serializedMap.clear();
-                        serializedMap.writeInt(ledgerMapSize - 4);
-                        serializedMap.writeLong(INVALID_LID);
-                        serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-                        serializedMap.writeInt(batchSize);
-
-                        startNewBatch = false;
-                        remainingInBatch = batchSize;
-                    }
-                    // Dump the ledger in the current batch
-                    serializedMap.writeLong(ledgerId);
-                    serializedMap.writeLong(size);
-                    --remainingLedgers;
-
-                    if (--remainingInBatch == 0) {
-                        // Close current batch
-                        try {
-                            entryLogChannel.write(serializedMap);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-
-                        startNewBatch = true;
-                    }
-                }
-            });
-        } catch (RuntimeException e) {
-            if (e.getCause() instanceof IOException) {
-                throw (IOException) e.getCause();
-            } else {
-                throw e;
-            }
-        } finally {
-            serializedMap.release();
-        }
-        // Flush the ledger's map out before we write the header.
-        // Otherwise the header might point to something that is not fully 
written
-        entryLogChannel.flush();
-
-        // Update the headers with the map offset and count of ledgers
-        ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-        mapInfo.putLong(ledgerMapOffset);
-        mapInfo.putInt(numberOfLedgers);
-        mapInfo.flip();
-        entryLogChannel.fileChannel.write(mapInfo, 
LEDGERS_MAP_OFFSET_POSITION);
-    }
 
     /**
      * An allocator pre-allocates entry log files.
      */
     class EntryLoggerAllocator {
 
         private long preallocatedLogId;
-        private Future<BufferedLogChannel> preallocation = null;
+        Future<BufferedLogChannel> preallocation = null;
         private ExecutorService allocatorExecutor;
-        private final Object createEntryLogLock = new Object();
-        private final Object createCompactionLogLock = new Object();
 
         EntryLoggerAllocator(long logId) {
             preallocatedLogId = logId;
             allocatorExecutor = Executors.newSingleThreadExecutor();
         }
 
-        BufferedLogChannel createNewLog() throws IOException {
-            synchronized (createEntryLogLock) {
-                BufferedLogChannel bc;
-                if (!entryLogPreAllocationEnabled){
-                    // create a new log directly
-                    bc = allocateNewLog();
-                    return bc;
-                } else {
-                    // allocate directly to response request
-                    if (null == preallocation){
-                        bc = allocateNewLog();
+        synchronized long getPreallocatedLogId(){
+            return preallocatedLogId;
+        }
+
+        synchronized BufferedLogChannel createNewLog() throws IOException {
+            BufferedLogChannel bc;
+            if (!entryLogPreAllocationEnabled || null == preallocation) {
+                // initialization time to create a new log
+                bc = allocateNewLog();
+            } else {
+                // has a preallocated entry log
+                try {
+                    /*
+                     * both createNewLog and allocateNewLog are synchronized on
+                     * EntryLoggerAllocator.this object. So it is possible that
+                     * a thread calling createNewLog would attain the lock on
+                     * this object and get to this point but preallocation
+                     * Future is starving for lock on EntryLoggerAllocator.this
+                     * to execute allocateNewLog. Here since we attained lock
+                     * for this it means preallocation future must have either
+                     * completed creating new log or still waiting for lock on
+                     * this object to execute allocateNewLog method. So we
+                     * should try to get result of the future without waiting.
+                     * If it fails with TimeoutException then call
+                     * allocateNewLog explicitly since we are holding the lock
+                     * on this anyhow.
+                     *
+                     */
+                    bc = preallocation.get(0, TimeUnit.MILLISECONDS);
+                } catch (ExecutionException ee) {
+                    if (ee.getCause() instanceof IOException) {
+                        throw (IOException) (ee.getCause());
                     } else {
-                        // has a preallocated entry log
-                        try {
-                            bc = preallocation.get();
-                        } catch (ExecutionException ee) {
-                            if (ee.getCause() instanceof IOException) {
-                                throw (IOException) (ee.getCause());
-                            } else {
-                                throw new IOException("Error to execute entry 
log allocation.", ee);
-                            }
-                        } catch (CancellationException ce) {
-                            throw new IOException("Task to allocate a new 
entry log is cancelled.", ce);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            throw new IOException("Intrrupted when waiting a 
new entry log to be allocated.", ie);
-                        }
+                        throw new IOException("Error to execute entry log 
allocation.", ee);
                     }
-                    // preallocate a new log in background upon every call
-                    preallocation = allocatorExecutor.submit(() -> 
allocateNewLog());
-                    return bc;
+                } catch (CancellationException ce) {
+                    throw new IOException("Task to allocate a new entry log is 
cancelled.", ce);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("Intrrupted when waiting a new entry 
log to be allocated.", ie);
+                } catch (TimeoutException e) {
+                    LOG.debug("Received TimeoutException while trying to get 
preallocation future result,"
+                            + " which means that Future is waiting for 
acquiring lock on EntryLoggerAllocator.this");
+                    bc = allocateNewLog();
                 }
             }
+            if (entryLogPreAllocationEnabled) {
+                /*
+                 * We should submit new callable / create new instance of 
future only if the previous preallocation is
+                 * null or if it is done. This is needed because previous 
preallocation has not completed its execution
+                 * since it is waiting for lock on EntryLoggerAllocator.this.
+                 */
+                if ((preallocation == null) || preallocation.isDone()) {
+                    preallocation = allocatorExecutor.submit(new 
Callable<BufferedLogChannel>() {
+                        @Override
+                        public BufferedLogChannel call() throws IOException {
+                            return allocateNewLog();
+                        }
+                    });
+                }
+            }
+            LOG.info("Created new entry logger {}.", bc.getLogId());
+            return bc;
         }
 
-        BufferedLogChannel createNewLogForCompaction() throws IOException {
-            synchronized (createCompactionLogLock) {
+        synchronized BufferedLogChannel createNewLogForCompaction() throws 
IOException {
                 return allocateNewLog(COMPACTING_SUFFIX);
-            }
         }
 
-        private BufferedLogChannel allocateNewLog() throws IOException {
-            return allocateNewLog(".log");
+        synchronized BufferedLogChannel allocateNewLog() throws IOException {
+            return allocateNewLog(LOG_FILE_SUFFIX);
         }
 
         /**
          * Allocate a new log file.
          */
-        private BufferedLogChannel allocateNewLog(String suffix) throws 
IOException {
-            List<File> list = 
ledgerDirsManager.getWritableLedgerDirsForNewLog();
-            Collections.shuffle(list);
-            // It would better not to overwrite existing entry log files
-            File newLogFile = null;
-            do {
+        synchronized BufferedLogChannel allocateNewLog(String suffix) throws 
IOException {
+            File dirForNextEntryLog;
+            List<File> list;
+
+            try {
+                list = ledgerDirsManager.getWritableLedgerDirs();
+            } catch (NoWritableLedgerDirException nwe) {
+                if (!ledgerDirsManager.hasWritableLedgerDirs()) {
+                    list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
+                } else {
+                    LOG.error("All Disks are not full, but 
getWritableLedgerDirs threw exception ", nwe);
+                    throw nwe;
+                }
+            }
+
+            dirForNextEntryLog = entryLogManager.getDirForNextEntryLog(list);
+
+            List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+            String logFileName;
+            while (true) {
 
 Review comment:
   - can you have a test case reproduce the behavior? I am not seeing how the 
case you described here can happen with original logic.
   
   - if there is a problem with this logic, I think it is better to fix in a 
subsequent change. It is not really related to abstract EntryLogManager.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to