lhotari commented on code in PR #22799:
URL: https://github.com/apache/pulsar/pull/22799#discussion_r1778236192


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -453,8 +456,13 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] 
data) throws InvalidProto
             try {
                 MLDataFormats.ManagedLedgerInfoMetadata metadata =
                         
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
-                return 
ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
-                        .decode(byteBuf, 
metadata.getUncompressedSize()).nioBuffer());
+                ByteBuf decode = 
getCompressionCodec(metadata.getCompressionType())
+                        .decode(byteBuf, metadata.getUncompressedSize());
+                try {
+                    return ManagedLedgerInfo.parseFrom(decode.nioBuffer());
+                } finally {
+                    decode.release();

Review Comment:
   When using LightProto, the buffer shouldn't be released until the parsed 
instance is no longer used. This is due to the implementation detail that 
String properties will be lazily resolved from the input buffer.
   It's possible to resolve this by making a copy with 
`ManagedLedgerInfo.copyFrom(ManagedLedgerInfo.parseFrom(decode.nioBuffer())` 
unless there's a solution in place to ensure that buffers aren't released as 
long as the instance is used.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -475,8 +483,13 @@ public ManagedCursorInfo parseManagedCursorInfo(byte[] 
data) throws InvalidProto
             try {
                 MLDataFormats.ManagedCursorInfoMetadata metadata =
                         
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
-                return 
ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
-                        .decode(byteBuf, 
metadata.getUncompressedSize()).nioBuffer());
+                ByteBuf decode = 
getCompressionCodec(metadata.getCompressionType())
+                        .decode(byteBuf, metadata.getUncompressedSize());
+                try {
+                    return ManagedCursorInfo.parseFrom(decode.nioBuffer());
+                } finally {
+                    decode.release();

Review Comment:
   Please see the previous comment about LightProto and buffer lifecycle.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -3123,47 +3337,217 @@ private 
List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
         }
     }
 
+    private void buildBatchEntryDeletionIndexInfoList(
+            PositionInfoUtils.BatchedEntryDeletionIndexInfoConsumer consumer) {
+        if (!getConfig().isDeletionAtBatchIndexLevelEnabled()) {
+            return;
+        }
+        int maxBatchDeletedIndexToPersist = 
getConfig().getMaxBatchDeletedIndexToPersist();
+        lock.readLock().lock();
+        try {
+            if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || 
batchDeletedIndexes.isEmpty()) {
+                return;
+            }
+            int count = 0;
+            Iterator<Map.Entry<Position, BitSetRecyclable>> iterator = 
batchDeletedIndexes.entrySet().iterator();
+            while (iterator.hasNext() && count < 
maxBatchDeletedIndexToPersist) {
+                Map.Entry<Position, BitSetRecyclable> entry = iterator.next();
+                long[] array = entry.getValue().toLongArray();
+                consumer.acceptRange(entry.getKey().getLedgerId(), 
entry.getKey().getEntryId(), array);
+                count++;
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry 
mdEntry, final VoidCallback callback) {
+        checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be 
greater than zero");
+        long now = System.nanoTime();
         Position position = mdEntry.newPosition;
-        PositionInfo pi = 
PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
-                .setEntryId(position.getEntryId())
-                
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
-                
.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
-                
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();
-
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Cursor {} Appending to ledger={} position={}", 
ledger.getName(), name, lh.getId(),
                     position);
         }
 
         requireNonNull(lh);
-        byte[] data = pi.toByteArray();
-        lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
-            if (rc == BKException.Code.OK) {
+        ByteBuf rawData = PositionInfoUtils.serializePositionInfo(mdEntry, 
position,
+                this::scanIndividualDeletedMessageRanges, 
this::buildBatchEntryDeletionIndexInfoList,
+                lastSerializedSize);
+        long endSer = System.nanoTime();
+        this.lastSerializedSize = rawData.readableBytes();
+
+        // rawData is released by compressDataIfNeeded if needed
+        ByteBuf data = compressDataIfNeeded(rawData, lh);
+
+        long endCompress = System.nanoTime();
+
+        int offset = 0;
+        final int len = data.readableBytes();
+        int numParts = isChunkingEnabled ? 1 + (len / maxPositionChunkSize) : 
1;
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Cursor {} Appending to ledger={} position={} data 
size {} bytes, numParts {}",
+                    ledger.getName(), name, lh.getId(),
+                    position, len, numParts);
+        }
+        log.info("[{}] Cursor {} Appending to ledger={} position={} data size 
{} bytes, "
+                        + "numParts {}, serializeTime {} ms"
+                        + " compressTime {} ms, total {} ms", 
ledger.getName(), name, lh.getId(),
+                position, len, numParts,
+                (endSer - now) / 1000000,
+                (endCompress - endSer)  / 1000000, (endCompress - now) / 
1000000);
+
+        if (numParts == 1) {
+            // no need for chunking
+            // asyncAddEntry will release data ByteBuf
+            writeToBookKeeperLastChunk(lh, mdEntry, callback, data, len, 
position, () -> {});
+        } else {
+            // chunking
+            int part = 0;
+            while (part != numParts) {
+                int remaining = len - offset;
+                int currentLen = Math.min(maxPositionChunkSize, remaining);
+                boolean isLast = part == numParts - 1;
+
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Updated cursor {} position {} in 
meta-ledger {}", ledger.getName(), name, position,
-                            lh1.getId());
+                    log.debug("[{}] Cursor {} Appending to ledger={} 
position={} data size {} bytes, numParts {} "
+                                    + "part {} offset {} len {}",
+                            ledger.getName(), name, lh.getId(),
+                            position, len, numParts, part, offset, currentLen);
                 }
 
-                rolloverLedgerIfNeeded(lh1);
+                // just send the addEntry, BK client guarantees that each 
entry succeeds only if all
+                // the previous entries succeeded
+                // asyncAddEntry takes ownership of the buffer
+                lh.asyncAddEntry(data.retainedSlice(offset, currentLen), (rc, 
lh1, entryId, ctx) -> {
+                }, null);
 
-                mbean.persistToLedger(true);
-                mbean.addWriteCursorLedgerSize(data.length);
-                callback.operationComplete();
-            } else {
-                log.warn("[{}] Error updating cursor {} position {} in 
meta-ledger {}: {}", ledger.getName(), name,
-                        position, lh1.getId(), BKException.getMessage(rc));
-                // If we've had a write error, the ledger will be 
automatically closed, we need to create a new one,
-                // in the meantime the mark-delete will be queued.
-                STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, 
State.Open, State.NoLedger);
+                if (isLast) {
+                    // last, send a footer with the number of parts
+                    ChunkSequenceFooter footer = new 
ChunkSequenceFooter(numParts, len);
+                    byte[] footerData;
+                    try {
+                        footerData = ObjectMapperFactory.getMapper()
+                                .getObjectMapper().writeValueAsBytes(footer);
+                    } catch (JsonProcessingException e) {
+                        // this is almost impossible to happen
+                        log.error("Cannot serialize footer {}", footer);
+                        return;
+                    }
+                    // need to explicitly release data ByteBuf
+                    writeToBookKeeperLastChunk(lh, mdEntry, callback,
+                            Unpooled.wrappedBuffer(footerData), len, position, 
data::release);
+                }
+                offset += currentLen;
+                part++;
+            }
+        }
+    }
+
+
+    private void writeToBookKeeperLastChunk(LedgerHandle lh,
+                                            MarkDeleteEntry mdEntry,
+                                            VoidCallback callback,
+                                            ByteBuf data,
+                                            int totalLength,
+                                            Position position,
+                                            Runnable onFinished) {
+        lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
+            try {
+                if (rc == BKException.Code.OK) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Updated cursor {} position {} in 
meta-ledger {}", ledger.getName(), name,
+                                position,
+                                lh1.getId());
+                    }
+
+                    rolloverLedgerIfNeeded(lh1);
 
-                // Before giving up, try to persist the position in the 
metadata store.
-                persistPositionToMetaStore(mdEntry, callback);
+                    mbean.persistToLedger(true);
+                    mbean.addWriteCursorLedgerSize(totalLength);
+                    callback.operationComplete();
+                } else {
+                    log.warn("[{}] Error updating cursor {} position {} in 
meta-ledger {}: {}", ledger.getName(), name,
+                            position, lh1.getId(), BKException.getMessage(rc));
+                    // If we've had a write error, the ledger will be 
automatically closed, we need to create a new one,
+                    // in the meantime the mark-delete will be queued.
+                    STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, 
State.Open, State.NoLedger);
+
+                    // Before giving up, try to persist the position in the 
metadata store.
+                    persistPositionToMetaStore(mdEntry, callback);
+                }
+            } finally {
+                onFinished.run();
             }
         }, null);
     }
 
+    private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) {
+        byte[] pulsarCursorInfoCompression =
+                
lh.getCustomMetadata().get(METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE);
+        if (pulsarCursorInfoCompression == null) {
+            return data;
+        }
+
+        try {
+            int uncompressedSize = data.readableBytes();
+            String pulsarCursorInfoCompressionString = new 
String(pulsarCursorInfoCompression);
+            CompressionCodec compressionCodec = 
CompressionCodecProvider.getCompressionCodec(
+                    
CompressionType.valueOf(pulsarCursorInfoCompressionString));
+            ByteBuf encode = compressionCodec.encode(data);
+
+            int compressedSize = encode.readableBytes();
+
+            ByteBuf szBuf = 
PulsarByteBufAllocator.DEFAULT.buffer(4).writeInt(uncompressedSize);
+
+            CompositeByteBuf result = 
PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+            result.addComponent(szBuf)
+                    .addComponent(encode);
+            result.readerIndex(0)
+                    .writerIndex(4 + compressedSize);

Review Comment:
   use `.addComponent(true, szBuf).addComponent(true, encode)` so that there 
isn't a need to set the writerIndex manually.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -558,76 +591,193 @@ protected void recoverFromLedger(final ManagedCursorInfo 
info, final VoidCallbac
 
             // Read the last entry in the ledger
             long lastEntryInLedger = lh.getLastAddConfirmed();
+            recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger);
+        };
+
+        try {
+            bookkeeper.asyncOpenLedger(ledgerId, digestType, 
getConfig().getPassword(), openCallback,
+                    null);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered error on opening cursor ledger {} for 
cursor {}",
+                ledger.getName(), ledgerId, name, t);
+            
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, 
null);
+        }
+    }
+
+    private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
+                                            VoidCallback callback,
+                                            LedgerHandle lh,
+                                            long entryId) {
+        long ledgerId = lh.getId();
+
+        if (entryId < 0) {
+            log.warn("[{}] Error reading from metadata ledger {} for cursor 
{}: No valid entries in ledger",
+                    ledger.getName(), ledgerId, name);
+            // Rewind to last cursor snapshot available
+            initialize(getRollbackPosition(info), Collections.emptyMap(), 
cursorProperties, callback);
+            return;
+        }
 
-            if (lastEntryInLedger < 0) {
-                log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: No entries in ledger",
-                        ledger.getName(), ledgerId, name);
-                // Rewind to last cursor snapshot available
+        lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
+            }
+            if (isBkErrorNotRecoverable(rc1)) {
+                log.error("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
+                        ledgerId, name, BKException.getMessage(rc1));
+                // Rewind to oldest entry available
                 initialize(getRollbackPosition(info), Collections.emptyMap(), 
cursorProperties, callback);
                 return;
-            }
+            } else if (rc1 != BKException.Code.OK) {
+                log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
+                        ledgerId, name, BKException.getMessage(rc1));
 
-            lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, 
lh1, seq, ctx1) -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
-                }
-                if (isBkErrorNotRecoverable(rc1)) {
-                    log.error("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
-                            ledgerId, name, BKException.getMessage(rc1));
-                    // Rewind to oldest entry available
-                    initialize(getRollbackPosition(info), 
Collections.emptyMap(), cursorProperties, callback);
-                    return;
-                } else if (rc1 != BKException.Code.OK) {
-                    log.warn("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
-                            ledgerId, name, BKException.getMessage(rc1));
+                callback.operationFailed(createManagedLedgerException(rc1));
+                return;
+            }
 
-                    
callback.operationFailed(createManagedLedgerException(rc1));
-                    return;
+            LedgerEntry entry = seq.nextElement();
+            ByteBuf data = entry.getEntryBuffer();
+            try {
+                ChunkSequenceFooter chunkSequenceFooter = 
parseChunkSequenceFooter(data);
+                if (chunkSequenceFooter.numParts > 0) {
+                    readChunkSequence(callback, lh, entryId, 
chunkSequenceFooter);
+                } else {
+                    Throwable res = tryCompleteCursorRecovery(lh, data);
+                    if (res == null) {
+                        callback.operationComplete();
+                    } else {
+                        log.warn("[{}] Error recovering from metadata ledger 
{} entry {} for cursor {}. "
+                                        + "Will try recovery from previous 
entry.",
+                                ledger.getName(), ledgerId, entryId, name, 
res);
+                        //try recovery from previous entry
+                        recoverFromLedgerByEntryId(info, callback, lh, entryId 
- 1);
+                    }
                 }
+            } catch (IOException error) {
+                log.error("Cannot parse footer", error);
+                log.warn("[{}] Error recovering from metadata ledger {} entry 
{} for cursor {}, cannot parse footer. "
+                                + "Will try recovery from previous entry.",
+                        ledger.getName(), ledgerId, entryId, name, error);
+                recoverFromLedgerByEntryId(info, callback, lh, entryId - 1);
+            }
+        }, null);
+    }
 
-                LedgerEntry entry = seq.nextElement();
-                mbean.addReadCursorLedgerSize(entry.getLength());
-                PositionInfo positionInfo;
-                try {
-                    positionInfo = PositionInfo.parseFrom(entry.getEntry());
-                } catch (InvalidProtocolBufferException e) {
-                    callback.operationFailed(new ManagedLedgerException(e));
-                    return;
-                }
+    private void readChunkSequence(VoidCallback callback, LedgerHandle lh,
+                                   long footerPosition, ChunkSequenceFooter 
chunkSequenceFooter) {
+        long startPos = footerPosition - chunkSequenceFooter.numParts;
+        long endPos = footerPosition - 1;
+        log.info("readChunkSequence from pos {}, num parts {}, startPos {}, 
endPos {}",
+                footerPosition, chunkSequenceFooter.numParts, startPos, 
endPos);
+        lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() 
{
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> entries, Object ctx) {
+                CompositeByteBuf buffer = 
PulsarByteBufAllocator.DEFAULT.compositeBuffer();
 
-                Map<String, Long> recoveredProperties = Collections.emptyMap();
-                if (positionInfo.getPropertiesCount() > 0) {
-                    // Recover properties map
-                    recoveredProperties = new HashMap<>();
-                    for (int i = 0; i < positionInfo.getPropertiesCount(); 
i++) {
-                        LongProperty property = positionInfo.getProperties(i);
-                        recoveredProperties.put(property.getName(), 
property.getValue());
+                AtomicInteger readableBytes = new AtomicInteger(0);
+                entries.asIterator().forEachRemaining(entry -> {
+                    if (log.isInfoEnabled()) {
+                        log.debug("pos {} len {} bytes ", entry.getEntryId(), 
entry.getLength());
                     }
+                    ByteBuf part = entry.getEntryBuffer();
+                    buffer.addComponent(part);
+                    readableBytes.addAndGet(part.readableBytes());
+                });
+                buffer.writerIndex(readableBytes.get())
+                        .readerIndex(0);

Review Comment:
   instead of adjusting the writerIndex manually, `buffer.addComponent(true, 
part)` could be a way to achieve this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to