Repository: activemq-artemis Updated Branches: refs/heads/1.x f94f8f471 -> 200088778
ARTEMIS-1085 Perform storelineup on appendRecord (cherry picked from commit 120b8aa7ad314b30bca082fb4f809bfa15a1dea1) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/20008877 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/20008877 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/20008877 Branch: refs/heads/1.x Commit: 200088778e2517d299ad3e3efa315ecafc8a8c41 Parents: bae011b Author: Martyn Taylor <[email protected]> Authored: Thu Mar 30 16:25:17 2017 +0100 Committer: Martyn Taylor <[email protected]> Committed: Mon Apr 3 12:12:33 2017 +0100 ---------------------------------------------------------------------- .../jdbc/store/journal/JDBCJournalImpl.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20008877/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index eb7cda1..a548157 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -167,8 +167,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { boolean success = false; try { for (JDBCJournalRecord record : recordRef) { - record.storeLineUp(); - switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: // Standard SQL Delete Record, Non transactional delete @@ -222,16 +220,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { cleanupTxRecords(deletedRecords, committedTransactions); } catch (SQLException e) { logger.warn("Failed to remove the Tx Records", e.getMessage(), e); + } finally { + executeCallbacks(recordRef, success); } - executeCallbacks(recordRef, success); return recordRef.size(); } /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException { - connection.rollback(); List<RecordInfo> iterableCopy; List<TransactionHolder> iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); @@ -262,6 +260,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private void performRollback(List<JDBCJournalRecord> records) { try { + connection.rollback(); + for (JDBCJournalRecord record : records) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { removeTxRecord(record); @@ -297,9 +297,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private void appendRecord(JDBCJournalRecord record) throws Exception { + record.storeLineUp(); SimpleWaitIOCallback callback = null; - if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) { + if (record.isSync() && record.getIoCompletion() == null) { callback = new SimpleWaitIOCallback(); record.setIoCompletion(callback); } @@ -316,8 +317,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { syncTimer.delay(); - if (callback != null) + if (callback != null) { callback.waitCompletion(); + } } private synchronized void addTxRecord(JDBCJournalRecord record) { @@ -504,6 +506,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendCommitRecord(long txID, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); + r.setSync(sync); appendRecord(r); } @@ -511,6 +514,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); + r.setSync(sync); r.setIoCompletion(callback); appendRecord(r); } @@ -524,6 +528,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); + r.setSync(sync); appendRecord(r); }
