This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e88da2a Fix transaction log append sync problem. (#9238)
e88da2a is described below
commit e88da2ad26608f102845dece1dd84301c5fb6340
Author: congbo <[email protected]>
AuthorDate: Wed Mar 3 07:51:30 2021 +0800
Fix transaction log append sync problem. (#9238)
## Motivation
When the transaction status become aborted or committed the cursor may not
delete the position, if not lock transaction operation and add append log
position.
If the txnMetaImpl have recycle, the same time we update the txnStatus in
this recycle one we will produce the error behavior.
---
.../impl/InMemTransactionMetadataStore.java | 2 +-
.../impl/MLTransactionMetadataStore.java | 30 +++++++++++---------
.../transaction/coordinator/impl/TxnMetaImpl.java | 33 ++--------------------
3 files changed, 21 insertions(+), 44 deletions(-)
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index 68d1c0e..1b588c7 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -65,7 +65,7 @@ class InMemTransactionMetadataStore implements
TransactionMetadataStore {
tcID.getId(),
localID.getAndIncrement()
);
- TxnMetaImpl txn = TxnMetaImpl.create(txnID);
+ TxnMetaImpl txn = new TxnMetaImpl(txnID);
transactions.put(txnID, txn);
return CompletableFuture.completedFuture(txnID);
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 227b4c4..f86e566 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -106,7 +106,7 @@ public class MLTransactionMetadataStore
} else {
List<Position> positions = new ArrayList<>();
positions.add(position);
- txnMetaMap.put(txnID,
MutablePair.of(TxnMetaImpl.create(txnID), positions));
+ txnMetaMap.put(txnID, MutablePair.of(new
TxnMetaImpl(txnID), positions));
txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(),
transactionMetadataEntry.getTimeoutMs());
@@ -140,7 +140,6 @@ public class MLTransactionMetadataStore
transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v ->
{
TxnMeta txnMeta =
txnMetaMap.remove(txnID).getLeft();
txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
- ((TxnMetaImpl) txnMeta).recycle();
});
} else {
txnMetaMap.get(txnID).getLeft()
@@ -200,7 +199,7 @@ public class MLTransactionMetadataStore
.setLastModificationTime(currentTimeMillis);
return transactionLog.append(transactionMetadataEntry)
.thenCompose(position -> {
- TxnMeta txn = TxnMetaImpl.create(txnID);
+ TxnMeta txn = new TxnMetaImpl(txnID);
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn,
positions);
@@ -229,11 +228,13 @@ public class MLTransactionMetadataStore
return transactionLog.append(transactionMetadataEntry)
.thenCompose(position -> {
try {
-
txnMetaListPair.getLeft().addProducedPartitions(partitions);
- txnMetaMap.get(txnID).getRight().add(position);
+ synchronized (txnMetaListPair.getLeft()) {
+
txnMetaListPair.getLeft().addProducedPartitions(partitions);
+ txnMetaMap.get(txnID).getRight().add(position);
+ }
return CompletableFuture.completedFuture(null);
} catch (InvalidTxnStatusException e) {
- txnMetaMap.get(txnID).getRight().add(position);
+
transactionLog.deletePosition(Collections.singletonList(position));
log.error("TxnID : " +
txnMetaListPair.getLeft().id().toString()
+ " add produced partition error with
TxnStatus : "
+
txnMetaListPair.getLeft().status().name(), e);
@@ -262,11 +263,13 @@ public class MLTransactionMetadataStore
return transactionLog.append(transactionMetadataEntry)
.thenCompose(position -> {
try {
-
txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
- txnMetaMap.get(txnID).getRight().add(position);
+ synchronized (txnMetaListPair.getLeft()) {
+
txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+ txnMetaMap.get(txnID).getRight().add(position);
+ }
return CompletableFuture.completedFuture(null);
} catch (InvalidTxnStatusException e) {
- txnMetaMap.get(txnID).getRight().add(position);
+
transactionLog.deletePosition(Collections.singletonList(position));
log.error("TxnID : " +
txnMetaListPair.getLeft().id().toString()
+ " add acked subscription error with
TxnStatus : "
+
txnMetaListPair.getLeft().status().name(), e);
@@ -297,19 +300,20 @@ public class MLTransactionMetadataStore
return
transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
try {
- txnMetaListPair.getLeft().updateTxnStatus(newStatus,
expectedStatus);
- txnMetaListPair.getRight().add(position);
+ synchronized (txnMetaListPair.getLeft()) {
+ txnMetaListPair.getLeft().updateTxnStatus(newStatus,
expectedStatus);
+ txnMetaListPair.getRight().add(position);
+ }
if (newStatus == TxnStatus.COMMITTED || newStatus ==
TxnStatus.ABORTED) {
return
transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
txnMetaMap.remove(txnID);
txnIdSortedSet.remove(txnID.getLeastSigBits());
- ((TxnMetaImpl)
txnMetaListPair.getLeft()).recycle();
return CompletableFuture.completedFuture(null);
});
}
return CompletableFuture.completedFuture(null);
} catch (InvalidTxnStatusException e) {
- txnMetaListPair.getRight().add(position);
+
transactionLog.deletePosition(Collections.singletonList(position));
log.error("TxnID : " +
txnMetaListPair.getLeft().id().toString()
+ " add update txn status error with TxnStatus : "
+ txnMetaListPair.getLeft().status().name(), e);
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index 16c5a7b..4462ffc 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -38,38 +36,13 @@ import
org.apache.pulsar.transaction.coordinator.util.TransactionUtil;
*/
class TxnMetaImpl implements TxnMeta {
- private TxnID txnID;
+ private final TxnID txnID;
private final Set<String> producedPartitions = new HashSet<>();
private final Set<TransactionSubscription> ackedPartitions = new
HashSet<>();
private volatile TxnStatus txnStatus = TxnStatus.OPEN;
- private final Handle<TxnMetaImpl> recycleHandle;
- private static final Recycler<TxnMetaImpl> RECYCLER = new
Recycler<TxnMetaImpl>() {
- protected TxnMetaImpl newObject(Recycler.Handle<TxnMetaImpl> handle) {
- return new TxnMetaImpl(handle);
- }
- };
-
- TxnMetaImpl(Handle<TxnMetaImpl> handle) {
- this.recycleHandle = handle;
- }
-
- // Constructor for transaction metadata
- static TxnMetaImpl create(TxnID txnID) {
- @SuppressWarnings("unchecked")
- TxnMetaImpl txnMeta = RECYCLER.get();
- txnMeta.txnID = txnID;
- return txnMeta;
- }
-
- public void recycle() {
- this.producedPartitions.clear();
- this.ackedPartitions.clear();
- this.txnStatus = TxnStatus.OPEN;
-
- if (recycleHandle != null) {
- recycleHandle.recycle(this);
- }
+ TxnMetaImpl(TxnID txnID) {
+ this.txnID = txnID;
}
@Override