lhotari commented on code in PR #22799:
URL: https://github.com/apache/pulsar/pull/22799#discussion_r1770803012


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -558,76 +590,178 @@ protected void recoverFromLedger(final ManagedCursorInfo 
info, final VoidCallbac
 
             // Read the last entry in the ledger
             long lastEntryInLedger = lh.getLastAddConfirmed();
+            recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger);
+        };
+
+        try {
+            bookkeeper.asyncOpenLedger(ledgerId, digestType, 
getConfig().getPassword(), openCallback,
+                    null);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered error on opening cursor ledger {} for 
cursor {}",
+                ledger.getName(), ledgerId, name, t);
+            
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, 
null);
+        }
+    }
+
+    private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
+                                            VoidCallback callback,
+                                            LedgerHandle lh,
+                                            long entryId) {
+        long ledgerId = lh.getId();
+
+        if (entryId < 0) {
+            log.warn("[{}] Error reading from metadata ledger {} for cursor 
{}: No valid entries in ledger",
+                    ledger.getName(), ledgerId, name);
+            // Rewind to last cursor snapshot available
+            initialize(getRollbackPosition(info), Collections.emptyMap(), 
cursorProperties, callback);
+            return;
+        }
 
-            if (lastEntryInLedger < 0) {
-                log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: No entries in ledger",
-                        ledger.getName(), ledgerId, name);
-                // Rewind to last cursor snapshot available
+        lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
+            }
+            if (isBkErrorNotRecoverable(rc1)) {
+                log.error("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
+                        ledgerId, name, BKException.getMessage(rc1));
+                // Rewind to oldest entry available
                 initialize(getRollbackPosition(info), Collections.emptyMap(), 
cursorProperties, callback);
                 return;
+            } else if (rc1 != BKException.Code.OK) {
+                log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
+                        ledgerId, name, BKException.getMessage(rc1));
+
+                callback.operationFailed(createManagedLedgerException(rc1));
+                return;
             }
 
-            lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, 
lh1, seq, ctx1) -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
+            LedgerEntry entry = seq.nextElement();
+            byte[] data = entry.getEntry();

Review Comment:
   Please replace this with the use of `getEntryBuffer` so that Netty `ByteBuf` 
is used instead of `byte[]`. 
   Large arrays adds significant GC overhead and that's why Netty `ByteBuf` is 
preferred.
   It's better to refactor the logic to operate with Netty ByteBufs.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -3123,47 +3321,220 @@ private 
List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
         }
     }
 
+    private void buildBatchEntryDeletionIndexInfoList(
+            PositionInfoUtils.BatchedEntryDeletionIndexInfoConsumer consumer) {
+        if (!getConfig().isDeletionAtBatchIndexLevelEnabled()) {
+            return;
+        }
+        int maxBatchDeletedIndexToPersist = 
getConfig().getMaxBatchDeletedIndexToPersist();
+        lock.readLock().lock();
+        try {
+            if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || 
batchDeletedIndexes.isEmpty()) {
+                return;
+            }
+            int count = 0;
+            Iterator<Map.Entry<Position, BitSetRecyclable>> iterator = 
batchDeletedIndexes.entrySet().iterator();
+            while (iterator.hasNext() && count < 
maxBatchDeletedIndexToPersist) {
+                Map.Entry<Position, BitSetRecyclable> entry = iterator.next();
+                long[] array = entry.getValue().toLongArray();
+                consumer.acceptRange(entry.getKey().getLedgerId(), 
entry.getKey().getEntryId(), array);
+                count++;
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry 
mdEntry, final VoidCallback callback) {
+        checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be 
greater than zero");
+        long now = System.nanoTime();
         Position position = mdEntry.newPosition;
-        PositionInfo pi = 
PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
-                .setEntryId(position.getEntryId())
-                
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
-                
.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
-                
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();
-
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Cursor {} Appending to ledger={} position={}", 
ledger.getName(), name, lh.getId(),
                     position);
         }
 
         requireNonNull(lh);
-        byte[] data = pi.toByteArray();
-        lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
-            if (rc == BKException.Code.OK) {
+        ByteBuf rawData = PositionInfoUtils.serializePositionInfo(mdEntry, 
position,
+                this::scanIndividualDeletedMessageRanges, 
this::buildBatchEntryDeletionIndexInfoList,
+                lastSerializedSize);
+        long endSer = System.nanoTime();
+        this.lastSerializedSize = rawData.readableBytes();
+
+        // rawData is released by compressDataIfNeeded if needed
+        ByteBuf data = compressDataIfNeeded(rawData, lh);
+
+        long endCompress = System.nanoTime();
+
+        int offset = 0;
+        final int len = data.readableBytes();
+        int numParts = 1 + (len / maxPositionChunkSize);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Cursor {} Appending to ledger={} position={} data 
size {} bytes, numParts {}",
+                    ledger.getName(), name, lh.getId(),
+                    position, len, numParts);
+        }
+        log.info("[{}] Cursor {} Appending to ledger={} position={} data size 
{} bytes, "
+                        + "numParts {}, serializeTime {} ms"
+                        + " compressTime {} ms, total {} ms", 
ledger.getName(), name, lh.getId(),
+                position, len, numParts,
+                (endSer - now) / 1000000,
+                (endCompress - endSer)  / 1000000, (endCompress - now) / 
1000000);
+
+        if (numParts == 1) {
+            // no need for chunking
+            // asyncAddEntry will release data ByteBuf
+            writeToBookKeeperLastChunk(lh, mdEntry, callback, data, len, 
position, () -> {});
+        } else {
+            // chunking
+            int part = 0;
+            while (part != numParts) {
+                int remaining = len - offset;
+                int currentLen = Math.min(maxPositionChunkSize, remaining);
+                boolean isLast = part == numParts - 1;
+
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Updated cursor {} position {} in 
meta-ledger {}", ledger.getName(), name, position,
-                            lh1.getId());
+                    log.debug("[{}] Cursor {} Appending to ledger={} 
position={} data size {} bytes, numParts {} "
+                                    + "part {} offset {} len {}",
+                            ledger.getName(), name, lh.getId(),
+                            position, len, numParts, part, offset, currentLen);
                 }
 
-                rolloverLedgerIfNeeded(lh1);
+                // just send the addEntry, BK client guarantees that each 
entry succeeds only if all
+                // the previous entries succeeded
+                // asyncAddEntry takes ownership of the buffer
+                lh.asyncAddEntry(data.retainedSlice(offset, currentLen), (rc, 
lh1, entryId, ctx) -> {
+                }, null);
 
-                mbean.persistToLedger(true);
-                mbean.addWriteCursorLedgerSize(data.length);
-                callback.operationComplete();
-            } else {
-                log.warn("[{}] Error updating cursor {} position {} in 
meta-ledger {}: {}", ledger.getName(), name,
-                        position, lh1.getId(), BKException.getMessage(rc));
-                // If we've had a write error, the ledger will be 
automatically closed, we need to create a new one,
-                // in the meantime the mark-delete will be queued.
-                STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, 
State.Open, State.NoLedger);
+                if (isLast) {
+                    // last, send a footer with the number of parts
+                    ChunkSequenceFooter footer = new 
ChunkSequenceFooter(numParts, len);
+                    byte[] footerData;
+                    try {
+                        footerData = ObjectMapperFactory.getMapper()
+                                .getObjectMapper().writeValueAsBytes(footer);
+                    } catch (JsonProcessingException e) {
+                        // this is almost impossible to happen
+                        log.error("Cannot serialize footer {}", footer);
+                        return;
+                    }
+                    // need to explicitly release data ByteBuf
+                    writeToBookKeeperLastChunk(lh, mdEntry, callback,
+                            Unpooled.wrappedBuffer(footerData), len, position, 
data::release);
+                }
+                offset += currentLen;
+                part++;
+            }
+        }
+    }
 
-                // Before giving up, try to persist the position in the 
metadata store.
-                persistPositionToMetaStore(mdEntry, callback);
+
+    private void writeToBookKeeperLastChunk(LedgerHandle lh,
+                                            MarkDeleteEntry mdEntry,
+                                            VoidCallback callback,
+                                            ByteBuf data,
+                                            int totalLength,
+                                            Position position,
+                                            Runnable onFinished) {
+        lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
+            try {
+                if (rc == BKException.Code.OK) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Updated cursor {} position {} in 
meta-ledger {}", ledger.getName(), name,
+                                position,
+                                lh1.getId());
+                    }
+
+                    rolloverLedgerIfNeeded(lh1);
+
+                    mbean.persistToLedger(true);
+                    mbean.addWriteCursorLedgerSize(totalLength);
+                    callback.operationComplete();
+                } else {
+                    log.warn("[{}] Error updating cursor {} position {} in 
meta-ledger {}: {}", ledger.getName(), name,
+                            position, lh1.getId(), BKException.getMessage(rc));
+                    // If we've had a write error, the ledger will be 
automatically closed, we need to create a new one,
+                    // in the meantime the mark-delete will be queued.
+                    STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, 
State.Open, State.NoLedger);
+
+                    // Before giving up, try to persist the position in the 
metadata store.
+                    persistPositionToMetaStore(mdEntry, callback);
+                }
+            } finally {
+                onFinished.run();
             }
         }, null);
     }
 
+    private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) {
+        byte[] pulsarCursorInfoCompression =
+                
lh.getCustomMetadata().get(METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE);
+        if (pulsarCursorInfoCompression == null) {
+            return data;
+        }
+
+        try {
+            int uncompressedSize = data.readableBytes();
+            String pulsarCursorInfoCompressionString = new 
String(pulsarCursorInfoCompression);
+            CompressionCodec compressionCodec = 
CompressionCodecProvider.getCompressionCodec(
+                    
CompressionType.valueOf(pulsarCursorInfoCompressionString));
+            ByteBuf encode = compressionCodec.encode(data);
+
+            int compressedSize = encode.readableBytes();
+
+            ByteBuf szBuf = 
PulsarByteBufAllocator.DEFAULT.buffer(4).writeInt(uncompressedSize);
+
+            CompositeByteBuf result = 
PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+            result.addComponent(szBuf)
+                    .addComponent(encode);
+            result.readerIndex(0)
+                    .writerIndex(4 + compressedSize);
+
+            int ratio = (int) (compressedSize * 100.0 / uncompressedSize);
+            log.info("[{}] Cursor {} Compressed data size {} bytes (with {}, 
original size {} bytes, ratio {}%)",
+                    ledger.getName(), name, compressedSize, 
pulsarCursorInfoCompressionString, uncompressedSize, ratio);
+            return result;
+        } finally {
+            data.release();
+        }
+    }
+
+    static byte[] decompressDataIfNeeded(byte[] data, LedgerHandle lh) {

Review Comment:
   Refactor this to use Netty ByteBufs instead of `byte[]`



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -558,76 +590,178 @@ protected void recoverFromLedger(final ManagedCursorInfo 
info, final VoidCallbac
 
             // Read the last entry in the ledger
             long lastEntryInLedger = lh.getLastAddConfirmed();
+            recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger);
+        };
+
+        try {
+            bookkeeper.asyncOpenLedger(ledgerId, digestType, 
getConfig().getPassword(), openCallback,
+                    null);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered error on opening cursor ledger {} for 
cursor {}",
+                ledger.getName(), ledgerId, name, t);
+            
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, 
null);
+        }
+    }
+
+    private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
+                                            VoidCallback callback,
+                                            LedgerHandle lh,
+                                            long entryId) {
+        long ledgerId = lh.getId();
+
+        if (entryId < 0) {
+            log.warn("[{}] Error reading from metadata ledger {} for cursor 
{}: No valid entries in ledger",
+                    ledger.getName(), ledgerId, name);
+            // Rewind to last cursor snapshot available
+            initialize(getRollbackPosition(info), Collections.emptyMap(), 
cursorProperties, callback);
+            return;
+        }
 
-            if (lastEntryInLedger < 0) {
-                log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: No entries in ledger",
-                        ledger.getName(), ledgerId, name);
-                // Rewind to last cursor snapshot available
+        lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
+            }
+            if (isBkErrorNotRecoverable(rc1)) {
+                log.error("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
+                        ledgerId, name, BKException.getMessage(rc1));
+                // Rewind to oldest entry available
                 initialize(getRollbackPosition(info), Collections.emptyMap(), 
cursorProperties, callback);
                 return;
+            } else if (rc1 != BKException.Code.OK) {
+                log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
+                        ledgerId, name, BKException.getMessage(rc1));
+
+                callback.operationFailed(createManagedLedgerException(rc1));
+                return;
             }
 
-            lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, 
lh1, seq, ctx1) -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
+            LedgerEntry entry = seq.nextElement();
+            byte[] data = entry.getEntry();
+            try {
+                ChunkSequenceFooter chunkSequenceFooter = 
parseChunkSequenceFooter(data);
+                if (chunkSequenceFooter.numParts > 0) {
+                    readChunkSequence(callback, lh, entryId, 
chunkSequenceFooter);
+                } else {
+                    Throwable res = tryCompleteCursorRecovery(lh, data);
+                    if (res == null) {
+                        callback.operationComplete();
+                    } else {
+                        log.warn("[{}] Error recovering from metadata ledger 
{} entry {} for cursor {}. "
+                                        + "Will try recovery from previous 
entry.",
+                                ledger.getName(), ledgerId, entryId, name, 
res);
+                        //try recovery from previous entry
+                        recoverFromLedgerByEntryId(info, callback, lh, entryId 
- 1);
+                    }
                 }
-                if (isBkErrorNotRecoverable(rc1)) {
-                    log.error("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
-                            ledgerId, name, BKException.getMessage(rc1));
-                    // Rewind to oldest entry available
-                    initialize(getRollbackPosition(info), 
Collections.emptyMap(), cursorProperties, callback);
-                    return;
-                } else if (rc1 != BKException.Code.OK) {
-                    log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
-                            ledgerId, name, BKException.getMessage(rc1));
+            } catch (IOException error) {
+                log.error("Cannot parse footer", error);
+                log.warn("[{}] Error recovering from metadata ledger {} entry 
{} for cursor {}, cannot parse footer. "
+                                + "Will try recovery from previous entry.",
+                        ledger.getName(), ledgerId, entryId, name, error);
+                recoverFromLedgerByEntryId(info, callback, lh, entryId - 1);
+            }
+        }, null);
+    }
 
-                    
callback.operationFailed(createManagedLedgerException(rc1));
-                    return;
+    private void readChunkSequence(VoidCallback callback, LedgerHandle lh,
+                                   long footerPosition, ChunkSequenceFooter 
chunkSequenceFooter) {
+        long startPos = footerPosition - chunkSequenceFooter.numParts;
+        long endPos = footerPosition - 1;
+        log.info("readChunkSequence from pos {}, num parts {}, startPos {}, 
endPos {}",
+                footerPosition, chunkSequenceFooter.numParts, startPos, 
endPos);
+        lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() 
{
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> entries, Object ctx) {
+                ByteArrayOutputStream buffer = new ByteArrayOutputStream();

Review Comment:
   `ByteArrayOutputStream` adds a lot of GC overhead compared to the usage of 
Netty ByteBufs.
   Please refactor this to use Netty ByteBufs instead of using 
ByteArrayOutputStream.
   
   (btw. There are also a lot of gotchas when using Netty ByteBufs, I happened 
to learn quite a few when working on optimizing 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 . The gotchas mainly apply when generating very huge response buffers like 
it's the case with metrics. The prometheus metrics results might be 500MB of 
text in certain worst cases with topic level metrics enabled in brokers. In 
this case we wouldn't have to be concerned about those challenges with Netty 
ByteBufs since I guess the size of the output isn't in that range.)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to