This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.15.x in repository https://gitbox.apache.org/repos/asf/activemq.git
commit 6ff79d85aae0068ede78927b7ea13d2783d9c767 Author: gtully <[email protected]> AuthorDate: Tue Jun 11 12:36:02 2019 +0100 AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit (cherry picked from commit 28a0cc6e5a78adb4b0b0134c860911c921f6a074) --- .../activemq/store/kahadb/MultiKahaDBTransactionStore.java | 14 +++++++++----- .../org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index 5befa92..d1b2d8e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -61,7 +61,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); - final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>(); + final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>(); private Journal journal; private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; @@ -279,10 +279,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore { public void persistOutcome(Tx tx, TransactionId txid) throws IOException { tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); + pendingCommit.put(txid, tx); } public void persistCompletion(TransactionId txid) throws IOException { store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); + pendingCommit.remove(txid); } private Location store(JournalCommand<?> data) throws IOException { @@ -335,6 +337,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore { for (Tx tx : inflightTransactions.values()) { knownDataFileIds.remove(tx.getPreparedLocationId()); } + for (Tx tx : pendingCommit.values()) { + knownDataFileIds.remove(tx.getPreparedLocationId()); + } try { journal.removeDataFiles(knownDataFileIds); } catch (Exception e) { @@ -360,8 +365,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore { process(location, load(location)); location = journal.getNextLocation(location); } - recoveredPendingCommit.addAll(inflightTransactions.keySet()); - LOG.info("pending local transactions: " + recoveredPendingCommit); + pendingCommit.putAll(inflightTransactions); + LOG.info("pending local transactions: " + pendingCommit.keySet()); } public JournalCommand<?> load(Location location) throws IOException { @@ -417,10 +422,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore { for (TransactionId txid : broker.getPreparedTransactions(null)) { if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { try { - if (recoveredPendingCommit.contains(txid)) { + if (pendingCommit.keySet().contains(txid)) { LOG.info("delivering pending commit outcome for tid: " + txid); broker.commitTransaction(null, txid, false); - recoveredPendingCommit.remove(txid); } else { LOG.info("delivering rollback outcome to store for tid: " + txid); broker.forgetTransaction(null, txid); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java index 4a7e9c6..da96431 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java @@ -207,7 +207,7 @@ public class MKahaDBTxRecoveryTest { multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024); - multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS); + multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10); broker = createBroker(multiKahaDBPersistenceAdapter); }
