sijie commented on a change in pull request #1281: Issue #570: Introducing 
EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281#discussion_r180505515
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##########
 @@ -120,21 +124,104 @@ public ConcurrentLongLongHashMap getLedgersMap() {
             return entryLogMetadata.getLedgersMap();
         }
 
+        public Long getLedgerIdAssigned() {
+            return ledgerIdAssigned;
+        }
+
+        public void setLedgerIdAssigned(Long ledgerId) {
+            this.ledgerIdAssigned = ledgerId;
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(BufferedChannel.class)
                 .add("logId", logId)
                 .add("logFile", logFile)
+                .add("ledgerIdAssigned", ledgerIdAssigned)
                 .toString();
         }
+
+        /**
+         * Append the ledger map at the end of the entry log.
+         * Updates the entry log file header with the offset and size of the 
map.
+         */
+        private void appendLedgersMap() throws IOException {
+
+            long ledgerMapOffset = this.position();
+
+            ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap();
+            int numberOfLedgers = (int) ledgersMap.size();
+
+            // Write the ledgers map into several batches
+
+            final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+            final ByteBuf serializedMap = 
ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+
+            try {
+                ledgersMap.forEach(new BiConsumerLong() {
+                    int remainingLedgers = numberOfLedgers;
+                    boolean startNewBatch = true;
+                    int remainingInBatch = 0;
+
+                    @Override
+                    public void accept(long ledgerId, long size) {
+                        if (startNewBatch) {
+                            int batchSize = Math.min(remainingLedgers, 
LEDGERS_MAP_MAX_BATCH_SIZE);
+                            int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + 
LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+                            serializedMap.clear();
+                            serializedMap.writeInt(ledgerMapSize - 4);
+                            serializedMap.writeLong(INVALID_LID);
+                            serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+                            serializedMap.writeInt(batchSize);
+
+                            startNewBatch = false;
+                            remainingInBatch = batchSize;
+                        }
+                        // Dump the ledger in the current batch
+                        serializedMap.writeLong(ledgerId);
+                        serializedMap.writeLong(size);
+                        --remainingLedgers;
+
+                        if (--remainingInBatch == 0) {
+                            // Close current batch
+                            try {
+                                write(serializedMap);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            startNewBatch = true;
+                        }
+                    }
+                });
+            } catch (RuntimeException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw e;
+                }
+            } finally {
+                serializedMap.release();
+            }
+            // Flush the ledger's map out before we write the header.
+            // Otherwise the header might point to something that is not fully
+            // written
+            super.flush();
+
+            // Update the headers with the map offset and count of ledgers
+            ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+            mapInfo.putLong(ledgerMapOffset);
+            mapInfo.putInt(numberOfLedgers);
+            mapInfo.flip();
+            this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+        }
     }
 
-    volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
     private final boolean entryLogPerLedgerEnabled;
-    private final AtomicBoolean shouldCreateNewEntryLog = new 
AtomicBoolean(false);
 
-    private volatile long leastUnflushedLogId;
+    RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
 Review comment:
   final

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to