lhotari commented on code in PR #24945:
URL: https://github.com/apache/pulsar/pull/24945#discussion_r2498036184


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -269,45 +260,130 @@ public long getCommittedTxnCount() {
 
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
-        // Method `takeAbortedTxnsSnapshot` will be executed in the different 
thread.
-        // So we need to retain the buffer in this thread. It will be released 
after message persistent.
-        buffer.retain();
-        CompletableFuture<Position> future = 
getPublishFuture().thenCompose(ignore -> {
-            if (checkIfNoSnapshot()) {
-                CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                // `publishFuture` will be completed after message persistent, 
so there will not be two threads
-                // writing snapshots at the same time.
-                
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() 
-> {
-                    if (changeToReadyStateFromNoSnapshot()) {
-                        timer.newTimeout(TopicTransactionBuffer.this,
-                                takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
-                        completableFuture.complete(null);
-                    } else {
-                        log.error("[{}]Failed to change state of transaction 
buffer to Ready from NoSnapshot",
-                                topic.getName());
-                        completableFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(
-                                "Transaction Buffer take first snapshot 
failed, the current state is: " + getState()));
-                    }
-                }).exceptionally(exception -> {
-                    log.error("Topic {} failed to take snapshot", 
this.topic.getName());
-                    completableFuture.completeExceptionally(exception);
-                    return null;
-                });
-                return completableFuture.thenCompose(__ -> 
internalAppendBufferToTxn(txnId, buffer));
-            } else if (checkIfReady()) {
-                return internalAppendBufferToTxn(txnId, buffer);
-            } else {
-                // `publishFuture` will be completed after transaction buffer 
recover completely
-                // during initializing, so this case should not happen.
+        synchronized (pendingAppendingTxnBufferTasks) {
+            // The first snapshot is in progress, the following publish tasks 
will be pending.
+            if (!pendingAppendingTxnBufferTasks.isEmpty()) {
+                CompletableFuture<Position> res = new CompletableFuture<>();
+                buffer.retain();
+                pendingAppendingTxnBufferTasks.offer(new 
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+                return res;
+            }
+
+            // `publishFuture` will be completed after transaction buffer 
recover completely
+            // during initializing, so this case should not happen.
+            if (!checkIfReady() && !checkIfNoSnapshot() && 
!checkIfFirstSnapshotting()) {
                 return FutureUtil.failedFuture(new 
BrokerServiceException.ServiceUnitNotReadyException(
                         "Transaction Buffer recover failed, the current state 
is: " + getState()));
             }
-        }).whenComplete(((position, throwable) -> buffer.release()));
-        setPublishFuture(future);
-        return future;
+
+            // The transaction buffer is ready to write.
+            if (checkIfReady()) {
+                return internalAppendBufferToTxn(txnId, buffer, sequenceId);
+            }
+
+            // Pending the current publishing and trigger new snapshot if 
needed.
+            CompletableFuture<Position> res = new CompletableFuture<>();
+            buffer.retain();
+            pendingAppendingTxnBufferTasks.offer(new 
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+            // Trigger the first snapshot.
+            getTransactionBufferFuture().whenComplete((ignore1, ex1) -> {
+                PendingAppendingTxnBufferTask pendingTask1 = null;
+                if (ex1 != null) {
+                    synchronized (pendingAppendingTxnBufferTasks) {
+                        while ((pendingTask1 = 
pendingAppendingTxnBufferTasks.poll()) != null) {
+                            pendingTask1.getBuffer().release();
+                            
pendingTask1.getPendingPublishFuture().completeExceptionally(ex1);
+                        }
+                        return;
+                    }
+                }
+                if (changeToFirstSnapshotting()) {
+                    log.info("[{}] Start to take the first snapshot", 
topic.getName());
+                    // Flush pending publishing after the first snapshot 
finished.
+                    takeFirstSnapshot().whenComplete((ignore2, ex2) -> {
+                        if (ex2 != null) {
+                            log.error("[{}] Failed to take the first snapshot, 
flushing failed publishing requests",
+                                    topic.getName(), ex2);
+                            synchronized (pendingAppendingTxnBufferTasks) {
+                                PendingAppendingTxnBufferTask pendingTask2 = 
null;
+                                while ((pendingTask2 = 
pendingAppendingTxnBufferTasks.poll()) != null) {
+                                    pendingTask2.getBuffer().release();
+                                    
pendingTask2.getPendingPublishFuture().completeExceptionally(ex2);
+                                }
+                                return;
+                            }
+                        }
+                        log.info("[{}] Finished to take the first snapshot, 
flushing publishing {} requests",
+                                topic.getName(), 
pendingAppendingTxnBufferTasks.size());
+                        PendingAppendingTxnBufferTask pendingTask2 = null;
+                        try {
+                            synchronized (pendingAppendingTxnBufferTasks) {
+                                while ((pendingTask2 = 
pendingAppendingTxnBufferTasks.poll()) != null) {
+                                    final ByteBuf data = 
pendingTask2.getBuffer();
+                                    // Method `internalAppendBufferToTxn` will 
be executed in the different thread.
+                                    // So we need to retain the buffer in this 
thread. It will be released after message
+                                    // persistent.
+                                    final CompletableFuture<Position> 
pendingFuture =
+                                            
pendingTask2.getPendingPublishFuture();
+                                    
internalAppendBufferToTxn(pendingTask2.getTxnId(), pendingTask2.getBuffer(),
+                                            pendingTask2.getSequenceId())
+                                            .whenComplete((positionAdded, ex3) 
-> {
+                                        if (ex3 != null) {
+                                            
pendingFuture.completeExceptionally(ex3);
+                                            return;

Review Comment:
   `data.release()` missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to