codelipenghui commented on a change in pull request #13209:
URL: https://github.com/apache/pulsar/pull/13209#discussion_r766489831



##########
File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
##########
@@ -18,46 +18,87 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import 
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover 
transaction log.
  */
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);

Review comment:
       We'd better return long not AtomicLong for the getter method.

##########
File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
##########
@@ -18,46 +18,87 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import 
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover 
transaction log.
  */
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
     public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
-        return null;
+        return op;
     }
 
+    // When all of ledger have been deleted, we will generate sequenceId from 
managedLedger properties
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
 
+        if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+            
sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+        }
     }
 
+    // When we don't roll over ledger, we can init sequenceId from the 
getLastAddConfirmed transaction metadata entry
     @Override
-    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle ledgerHandle) {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = 
entries.getEntry(lh.getLastAddConfirmed());

Review comment:
       The`ledgerEntry`should be closed after getting the max local txn ID.

##########
File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
##########
@@ -18,46 +18,87 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import 
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover 
transaction log.
  */
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
     public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
-        return null;
+        return op;
     }
 
+    // When all of ledger have been deleted, we will generate sequenceId from 
managedLedger properties
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
 
+        if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+            
sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+        }
     }
 
+    // When we don't roll over ledger, we can init sequenceId from the 
getLastAddConfirmed transaction metadata entry
     @Override
-    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle ledgerHandle) {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = 
entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                TransactionMetadataEntry lastConfirmEntry = 
new TransactionMetadataEntry();
+                                ByteBuf buffer = ledgerEntry.getEntryBuffer();

Review comment:
       The buffer should be released after getting the max local txn ID.

##########
File path: 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
##########
@@ -42,10 +42,14 @@
                                                                  
ManagedLedgerConfig managedLedgerConfig,
                                                                  
TransactionTimeoutTracker timeoutTracker,
                                                                  
TransactionRecoverTracker recoverTracker) {
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(new 
MLTransactionLogInterceptor());
         MLTransactionLogImpl txnLog = new 
MLTransactionLogImpl(transactionCoordinatorId,
                 managedLedgerFactory, managedLedgerConfig);
 
+        // MLTransactionLogInterceptor will init sequenceId and update the 
sequenceId to managedLedger properties.
         return txnLog.initialize().thenApply(__ ->
-                new MLTransactionMetadataStore(transactionCoordinatorId, 
txnLog, timeoutTracker, recoverTracker));
+                new MLTransactionMetadataStore(transactionCoordinatorId, 
txnLog, timeoutTracker,
+                        recoverTracker, 
mlTransactionLogInterceptor.getSequenceId()));

Review comment:
       It's better to introduce modify sequence ID method for 
`mlTransactionLogInterceptor`, share AtomicLong across multiple instances is 
not good for maintainance




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to