congbobo184 commented on code in PR #16707:
URL: https://github.com/apache/pulsar/pull/16707#discussion_r926446584
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -104,6 +126,11 @@ public MLPendingAckStore(ManagedLedger managedLedger,
ManagedCursor cursor,
this.subManagedCursor = subManagedCursor;
this.logIndexBackoff = new
LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
this.maxIndexLag = logIndexBackoff.next(0);
+ this.bufferedWriter = new TxnLogBufferedWriter(managedLedger,
((ManagedLedgerImpl) managedLedger).getExecutor(),
+ scheduledExecutorService, PendingAckLogSerializer.INSTANCE,
+ bufferedWriterConfig.getBatchedWriteMaxRecords(),
bufferedWriterConfig.getBatchedWriteMaxSize(),
+ bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(),
bufferedWriterConfig.isBatchEnabled());
+ this.batchedPendingAckLogsWaitingForHandle = new
ArrayList<>(bufferedWriterConfig.getBatchedWriteMaxRecords());
Review Comment:
We don't need to set the size, we can wait for it to automatically expand
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
if (exception instanceof
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
- buf.release();
completableFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
return completableFuture;
}
+ /**
+ * Build the index mapping of Transaction pending ack log (aka t-log) and
Topic message log (aka m-log).
+ * When m-log has been ack, t-log which holds m-log is no longer useful,
this method builder the mapping of them.
+ *
+ * If a Ledger Entry has many t-log, we only need to care about the record
that carries the largest acknowledgement
+ * info. Because all Commit/Abort log after this record describes behavior
acknowledgement, if the behavior
+ * acknowledgement has been handle correct, these Commit/Abort log is no
longer useful.
+ * @param logPosition The position of batch log Entry.
+ * @param logList Pending ack log records in a batch log Entry.
+ */
+ private void handleMetadataEntry(PositionImpl logPosition,
List<PendingAckMetadataEntry> logList) {
+ // Find the record that carries the largest ack info, and call
"handleMetadataEntry(position, pendingAckLog)"
+ PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+ PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+ for (int i = logList.size() - 1; i >= 0; i--){
+ PendingAckMetadataEntry pendingAckLog = logList.get(i);
+ if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+ && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT)
{
Review Comment:
```suggestion
if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
|| pendingAckLog.getPendingAckOp() ==
PendingAckOp.COMMIT) {
```
it is very similar to
https://github.com/apache/pulsar/blob/4483df2b3d113bfb55a8bb1a9e6c77160257c475/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java#L253-L254
may be we can advance logic
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
if (exception instanceof
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
- buf.release();
completableFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
return completableFuture;
}
+ /**
+ * Build the index mapping of Transaction pending ack log (aka t-log) and
Topic message log (aka m-log).
+ * When m-log has been ack, t-log which holds m-log is no longer useful,
this method builder the mapping of them.
+ *
+ * If a Ledger Entry has many t-log, we only need to care about the record
that carries the largest acknowledgement
+ * info. Because all Commit/Abort log after this record describes behavior
acknowledgement, if the behavior
+ * acknowledgement has been handle correct, these Commit/Abort log is no
longer useful.
+ * @param logPosition The position of batch log Entry.
+ * @param logList Pending ack log records in a batch log Entry.
+ */
+ private void handleMetadataEntry(PositionImpl logPosition,
List<PendingAckMetadataEntry> logList) {
+ // Find the record that carries the largest ack info, and call
"handleMetadataEntry(position, pendingAckLog)"
+ PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+ PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+ for (int i = logList.size() - 1; i >= 0; i--){
+ PendingAckMetadataEntry pendingAckLog = logList.get(i);
+ if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+ && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT)
{
+ continue;
+ }
+ if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+ continue;
+ }
+ for (PendingAckMetadata ack :
pendingAckLog.getPendingAckMetadatasList()){
+ if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(),
ack.getEntryId()) < 0){
+ maxAcknowledgementPosition =
PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+ pendingAckLogHasMaxAckPosition = pendingAckLog;
+ }
+ }
Review Comment:
this logic is very similar to
https://github.com/apache/pulsar/blob/4483df2b3d113bfb55a8bb1a9e6c77160257c475/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java#L255-L257
may we can handle once
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -332,14 +408,41 @@ public void run() {
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
&& fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
- ByteBuf buffer = entry.getDataBuffer();
currentLoadPosition =
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
- PendingAckMetadataEntry pendingAckMetadataEntry = new
PendingAckMetadataEntry();
- pendingAckMetadataEntry.parseFrom(buffer,
buffer.readableBytes());
- currentIndexLag.incrementAndGet();
- handleMetadataEntry(new
PositionImpl(entry.getLedgerId(), entry.getEntryId()),
- pendingAckMetadataEntry);
-
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
+ List<PendingAckMetadataEntry> logs =
deserializeEntry(entry);
+ if (logs.isEmpty()){
+ continue;
+ } else if (logs.size() == 1){
+ currentIndexLag.incrementAndGet();
+ PendingAckMetadataEntry log = logs.get(0);
+ handleMetadataEntry(new
PositionImpl(entry.getLedgerId(), entry.getEntryId()), log);
+ pendingAckReplyCallBack.handleMetadataEntry(log);
+ } else {
+ /**
+ * 1. Query batch index of current entry from
cursor.
+ * 2. Filter the data which has already ack.
+ * 3. Build batched position and handle valid data.
+ */
+ long[] ackSetAlreadyAck =
cursor.getDeletedBatchIndexesAsLongArray(
+ PositionImpl.get(entry.getLedgerId(),
entry.getEntryId()));
+ BitSetRecyclable bitSetAlreadyAck = null;
+ if (ackSetAlreadyAck != null){
+ bitSetAlreadyAck =
BitSetRecyclable.valueOf(ackSetAlreadyAck);
+ }
Review Comment:
pending ack clear log data is use markDelete, so we don't need this logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]