lhotari commented on code in PR #22799: URL: https://github.com/apache/pulsar/pull/22799#discussion_r1770803012
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java: ########## @@ -558,76 +590,178 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // Read the last entry in the ledger long lastEntryInLedger = lh.getLastAddConfirmed(); + recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger); + }; + + try { + bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, + null); + } catch (Throwable t) { + log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", + ledger.getName(), ledgerId, name, t); + openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null); + } + } + + private void recoverFromLedgerByEntryId(ManagedCursorInfo info, + VoidCallback callback, + LedgerHandle lh, + long entryId) { + long ledgerId = lh.getId(); + + if (entryId < 0) { + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No valid entries in ledger", + ledger.getName(), ledgerId, name); + // Rewind to last cursor snapshot available + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); + return; + } - if (lastEntryInLedger < 0) { - log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger", - ledger.getName(), ledgerId, name); - // Rewind to last cursor snapshot available + lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> { + if (log.isDebugEnabled()) { + log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); + } + if (isBkErrorNotRecoverable(rc1)) { + log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), + ledgerId, name, BKException.getMessage(rc1)); + // Rewind to oldest entry available initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; + } else if (rc1 != BKException.Code.OK) { + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), + ledgerId, name, BKException.getMessage(rc1)); + + callback.operationFailed(createManagedLedgerException(rc1)); + return; } - lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> { - if (log.isDebugEnabled()) { - log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); + LedgerEntry entry = seq.nextElement(); + byte[] data = entry.getEntry(); Review Comment: Please replace this with the use of `getEntryBuffer` so that Netty `ByteBuf` is used instead of `byte[]`. Large arrays adds significant GC overhead and that's why Netty `ByteBuf` is preferred. It's better to refactor the logic to operate with Netty ByteBufs. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java: ########## @@ -3123,47 +3321,220 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio } } + private void buildBatchEntryDeletionIndexInfoList( + PositionInfoUtils.BatchedEntryDeletionIndexInfoConsumer consumer) { + if (!getConfig().isDeletionAtBatchIndexLevelEnabled()) { + return; + } + int maxBatchDeletedIndexToPersist = getConfig().getMaxBatchDeletedIndexToPersist(); + lock.readLock().lock(); + try { + if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) { + return; + } + int count = 0; + Iterator<Map.Entry<Position, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator(); + while (iterator.hasNext() && count < maxBatchDeletedIndexToPersist) { + Map.Entry<Position, BitSetRecyclable> entry = iterator.next(); + long[] array = entry.getValue().toLongArray(); + consumer.acceptRange(entry.getKey().getLedgerId(), entry.getKey().getEntryId(), array); + count++; + } + } finally { + lock.readLock().unlock(); + } + } + void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { + checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be greater than zero"); + long now = System.nanoTime(); Position position = mdEntry.newPosition; - PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId()) - .setEntryId(position.getEntryId()) - .addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()) - .addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()) - .addAllProperties(buildPropertiesMap(mdEntry.properties)).build(); - if (log.isDebugEnabled()) { log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(), position); } requireNonNull(lh); - byte[] data = pi.toByteArray(); - lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> { - if (rc == BKException.Code.OK) { + ByteBuf rawData = PositionInfoUtils.serializePositionInfo(mdEntry, position, + this::scanIndividualDeletedMessageRanges, this::buildBatchEntryDeletionIndexInfoList, + lastSerializedSize); + long endSer = System.nanoTime(); + this.lastSerializedSize = rawData.readableBytes(); + + // rawData is released by compressDataIfNeeded if needed + ByteBuf data = compressDataIfNeeded(rawData, lh); + + long endCompress = System.nanoTime(); + + int offset = 0; + final int len = data.readableBytes(); + int numParts = 1 + (len / maxPositionChunkSize); + + if (log.isDebugEnabled()) { + log.debug("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {}", + ledger.getName(), name, lh.getId(), + position, len, numParts); + } + log.info("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, " + + "numParts {}, serializeTime {} ms" + + " compressTime {} ms, total {} ms", ledger.getName(), name, lh.getId(), + position, len, numParts, + (endSer - now) / 1000000, + (endCompress - endSer) / 1000000, (endCompress - now) / 1000000); + + if (numParts == 1) { + // no need for chunking + // asyncAddEntry will release data ByteBuf + writeToBookKeeperLastChunk(lh, mdEntry, callback, data, len, position, () -> {}); + } else { + // chunking + int part = 0; + while (part != numParts) { + int remaining = len - offset; + int currentLen = Math.min(maxPositionChunkSize, remaining); + boolean isLast = part == numParts - 1; + if (log.isDebugEnabled()) { - log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position, - lh1.getId()); + log.debug("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {} " + + "part {} offset {} len {}", + ledger.getName(), name, lh.getId(), + position, len, numParts, part, offset, currentLen); } - rolloverLedgerIfNeeded(lh1); + // just send the addEntry, BK client guarantees that each entry succeeds only if all + // the previous entries succeeded + // asyncAddEntry takes ownership of the buffer + lh.asyncAddEntry(data.retainedSlice(offset, currentLen), (rc, lh1, entryId, ctx) -> { + }, null); - mbean.persistToLedger(true); - mbean.addWriteCursorLedgerSize(data.length); - callback.operationComplete(); - } else { - log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name, - position, lh1.getId(), BKException.getMessage(rc)); - // If we've had a write error, the ledger will be automatically closed, we need to create a new one, - // in the meantime the mark-delete will be queued. - STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); + if (isLast) { + // last, send a footer with the number of parts + ChunkSequenceFooter footer = new ChunkSequenceFooter(numParts, len); + byte[] footerData; + try { + footerData = ObjectMapperFactory.getMapper() + .getObjectMapper().writeValueAsBytes(footer); + } catch (JsonProcessingException e) { + // this is almost impossible to happen + log.error("Cannot serialize footer {}", footer); + return; + } + // need to explicitly release data ByteBuf + writeToBookKeeperLastChunk(lh, mdEntry, callback, + Unpooled.wrappedBuffer(footerData), len, position, data::release); + } + offset += currentLen; + part++; + } + } + } - // Before giving up, try to persist the position in the metadata store. - persistPositionToMetaStore(mdEntry, callback); + + private void writeToBookKeeperLastChunk(LedgerHandle lh, + MarkDeleteEntry mdEntry, + VoidCallback callback, + ByteBuf data, + int totalLength, + Position position, + Runnable onFinished) { + lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> { + try { + if (rc == BKException.Code.OK) { + if (log.isDebugEnabled()) { + log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, + position, + lh1.getId()); + } + + rolloverLedgerIfNeeded(lh1); + + mbean.persistToLedger(true); + mbean.addWriteCursorLedgerSize(totalLength); + callback.operationComplete(); + } else { + log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name, + position, lh1.getId(), BKException.getMessage(rc)); + // If we've had a write error, the ledger will be automatically closed, we need to create a new one, + // in the meantime the mark-delete will be queued. + STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); + + // Before giving up, try to persist the position in the metadata store. + persistPositionToMetaStore(mdEntry, callback); + } + } finally { + onFinished.run(); } }, null); } + private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) { + byte[] pulsarCursorInfoCompression = + lh.getCustomMetadata().get(METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE); + if (pulsarCursorInfoCompression == null) { + return data; + } + + try { + int uncompressedSize = data.readableBytes(); + String pulsarCursorInfoCompressionString = new String(pulsarCursorInfoCompression); + CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec( + CompressionType.valueOf(pulsarCursorInfoCompressionString)); + ByteBuf encode = compressionCodec.encode(data); + + int compressedSize = encode.readableBytes(); + + ByteBuf szBuf = PulsarByteBufAllocator.DEFAULT.buffer(4).writeInt(uncompressedSize); + + CompositeByteBuf result = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2); + result.addComponent(szBuf) + .addComponent(encode); + result.readerIndex(0) + .writerIndex(4 + compressedSize); + + int ratio = (int) (compressedSize * 100.0 / uncompressedSize); + log.info("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)", + ledger.getName(), name, compressedSize, pulsarCursorInfoCompressionString, uncompressedSize, ratio); + return result; + } finally { + data.release(); + } + } + + static byte[] decompressDataIfNeeded(byte[] data, LedgerHandle lh) { Review Comment: Refactor this to use Netty ByteBufs instead of `byte[]` ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java: ########## @@ -558,76 +590,178 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // Read the last entry in the ledger long lastEntryInLedger = lh.getLastAddConfirmed(); + recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger); + }; + + try { + bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, + null); + } catch (Throwable t) { + log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", + ledger.getName(), ledgerId, name, t); + openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null); + } + } + + private void recoverFromLedgerByEntryId(ManagedCursorInfo info, + VoidCallback callback, + LedgerHandle lh, + long entryId) { + long ledgerId = lh.getId(); + + if (entryId < 0) { + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No valid entries in ledger", + ledger.getName(), ledgerId, name); + // Rewind to last cursor snapshot available + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); + return; + } - if (lastEntryInLedger < 0) { - log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger", - ledger.getName(), ledgerId, name); - // Rewind to last cursor snapshot available + lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> { + if (log.isDebugEnabled()) { + log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); + } + if (isBkErrorNotRecoverable(rc1)) { + log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), + ledgerId, name, BKException.getMessage(rc1)); + // Rewind to oldest entry available initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; + } else if (rc1 != BKException.Code.OK) { + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), + ledgerId, name, BKException.getMessage(rc1)); + + callback.operationFailed(createManagedLedgerException(rc1)); + return; } - lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> { - if (log.isDebugEnabled()) { - log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); + LedgerEntry entry = seq.nextElement(); + byte[] data = entry.getEntry(); + try { + ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data); + if (chunkSequenceFooter.numParts > 0) { + readChunkSequence(callback, lh, entryId, chunkSequenceFooter); + } else { + Throwable res = tryCompleteCursorRecovery(lh, data); + if (res == null) { + callback.operationComplete(); + } else { + log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}. " + + "Will try recovery from previous entry.", + ledger.getName(), ledgerId, entryId, name, res); + //try recovery from previous entry + recoverFromLedgerByEntryId(info, callback, lh, entryId - 1); + } } - if (isBkErrorNotRecoverable(rc1)) { - log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), - ledgerId, name, BKException.getMessage(rc1)); - // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); - return; - } else if (rc1 != BKException.Code.OK) { - log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), - ledgerId, name, BKException.getMessage(rc1)); + } catch (IOException error) { + log.error("Cannot parse footer", error); + log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}, cannot parse footer. " + + "Will try recovery from previous entry.", + ledger.getName(), ledgerId, entryId, name, error); + recoverFromLedgerByEntryId(info, callback, lh, entryId - 1); + } + }, null); + } - callback.operationFailed(createManagedLedgerException(rc1)); - return; + private void readChunkSequence(VoidCallback callback, LedgerHandle lh, + long footerPosition, ChunkSequenceFooter chunkSequenceFooter) { + long startPos = footerPosition - chunkSequenceFooter.numParts; + long endPos = footerPosition - 1; + log.info("readChunkSequence from pos {}, num parts {}, startPos {}, endPos {}", + footerPosition, chunkSequenceFooter.numParts, startPos, endPos); + lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); Review Comment: `ByteArrayOutputStream` adds a lot of GC overhead compared to the usage of Netty ByteBufs. Please refactor this to use Netty ByteBufs instead of using ByteArrayOutputStream. (btw. There are also a lot of gotchas when using Netty ByteBufs, I happened to learn quite a few when working on optimizing https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java . The gotchas mainly apply when generating very huge response buffers like it's the case with metrics. The prometheus metrics results might be 500MB of text in certain worst cases with topic level metrics enabled in brokers. In this case we wouldn't have to be concerned about those challenges with Netty ByteBufs since I guess the size of the output isn't in that range.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org