poorbarcode commented on code in PR #20559:
URL: https://github.com/apache/pulsar/pull/20559#discussion_r1312688627


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java:
##########
@@ -51,5 +52,5 @@ static TransactionBufferProvider newProvider(String 
providerClassName) throws IO
      * @param originTopic
      * @return
      */
-    TransactionBuffer newTransactionBuffer(Topic originTopic);
+    TransactionBuffer newTransactionBuffer(Topic originTopic, PositionImpl 
startUsedPosition);

Review Comment:
   Since the `TransactionBufferProvider` is a public API and users might use 
the config `transactionBufferProviderClassName` to change the implementation to 
use, we can't modify the original method; adding a new default method is better.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -250,7 +231,54 @@ public long getCommittedTxnCount() {
     }
 
     @Override
+/**
+ * If it's the first time using the transaction buffer, the method records the 
max read position of the topic
+ * to metadata of the topic at the time of the first write. If the transaction 
buffer has been used before,
+ * it writes the message after recovering the transaction buffer.
+ */
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        if (checkIfReady()) {
+            // Transaction buffer is ready, directly append the buffer
+            return internalAppendBufferToTxn(txnId, sequenceId, buffer);
+        } else if (changeToInitializingState()) {
+            // Transition to initializing state and record max read position 
if it's the first usage
+            Position firstMaxReadPosition = 
topic.getManagedLedger().getLastConfirmedEntry();
+            if (managedLedger.getProperties().get(MAX_READ_POSITION) == null) {
+                HashMap<String, String> hashMap = new HashMap<>();
+                hashMap.put(MAX_READ_POSITION, 
firstMaxReadPosition.toString());
+                managedLedger.asyncSetProperties(hashMap, new 
AsyncCallbacks.UpdatePropertiesCallback() {
+                    @Override
+                    public void updatePropertiesComplete(Map<String, String> 
properties, Object ctx) {
+                        changeToReadyState();
+                        transactionBufferFuture.complete(null);
+                    }
+
+                    @Override
+                    public void updatePropertiesFailed(ManagedLedgerException 
exception, Object ctx) {
+                        log.error("Failed to set first max read position to 
topic {}", topic.getName(), exception);
+                        changeToCloseState();
+                        
transactionBufferFuture.completeExceptionally(exception);
+                    }
+                }, null);
+            }
+        }
+        // Return a CompletableFuture that will complete after recovering the 
transaction buffer,
+        // then append the buffer
+        return transactionBufferFuture.thenCompose(ignore -> 
internalAppendBufferToTxn(txnId, sequenceId, buffer));
+    }
+
+    private PositionImpl getPositionFromString(String positionStr) {

Review Comment:
   The method `getPositionFromString` is useless, should we remove it?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1459,61 +1459,48 @@ protected void handleProducer(final CommandProducer 
cmdProducer) {
                     });
 
                     schemaVersionFuture.thenAccept(schemaVersion -> {
-                        
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future 
-> {

Review Comment:
   The method `checkIfTransactionBufferRecoverCompletely` triggers an event to 
persist a snapshot once, but you did not persist a snapshot when the first 
sending with the transition.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -66,6 +69,8 @@
 @Slf4j
 public class TopicTransactionBuffer extends TopicTransactionBufferState 
implements TransactionBuffer, TimerTask {
 
+    public static final String MAX_READ_POSITION = "maxReadPosition";

Review Comment:
   The prop added does not mean the max read position of the topic, right? It 
is the first position that sending with the transaction.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -250,7 +231,54 @@ public long getCommittedTxnCount() {
     }
 
     @Override
+/**
+ * If it's the first time using the transaction buffer, the method records the 
max read position of the topic
+ * to metadata of the topic at the time of the first write. If the transaction 
buffer has been used before,
+ * it writes the message after recovering the transaction buffer.
+ */
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        if (checkIfReady()) {
+            // Transaction buffer is ready, directly append the buffer
+            return internalAppendBufferToTxn(txnId, sequenceId, buffer);
+        } else if (changeToInitializingState()) {
+            // Transition to initializing state and record max read position 
if it's the first usage
+            Position firstMaxReadPosition = 
topic.getManagedLedger().getLastConfirmedEntry();
+            if (managedLedger.getProperties().get(MAX_READ_POSITION) == null) {
+                HashMap<String, String> hashMap = new HashMap<>();
+                hashMap.put(MAX_READ_POSITION, 
firstMaxReadPosition.toString());
+                managedLedger.asyncSetProperties(hashMap, new 
AsyncCallbacks.UpdatePropertiesCallback() {
+                    @Override
+                    public void updatePropertiesComplete(Map<String, String> 
properties, Object ctx) {
+                        changeToReadyState();

Review Comment:
   Should we start the scheduled task to persist the snapshot here? And add a 
test for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to