Repository: activemq-artemis
Updated Branches:
  refs/heads/master fc4d5edef -> 840b248b1


ARTEMIS-1115 Traces and tests on JDBC Persistence


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bfe2bdd7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bfe2bdd7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bfe2bdd7

Branch: refs/heads/master
Commit: bfe2bdd7b29f2edc98b19ed11f64cad54ef7aaa2
Parents: 7b68b0a
Author: Clebert Suconic <[email protected]>
Authored: Tue Apr 18 14:36:25 2017 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Wed Apr 19 00:50:58 2017 -0400

----------------------------------------------------------------------
 .../jdbc/store/journal/JDBCJournalImpl.java     | 310 ++++++++++++++++---
 .../jdbc/store/journal/JDBCJournalRecord.java   |  19 ++
 .../paging/impl/PagingStoreFactoryDatabase.java |   6 +-
 .../core/ServerSessionPacketHandler.java        |   6 +-
 .../jdbc/store/journal/ShutdownServerTest.java  |  84 +++++
 .../tests/integration/xa/BasicXaTest.java       |   2 +-
 6 files changed, 370 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfe2bdd7/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 0e67dbc..7d30c4b 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -48,7 +49,6 @@ import 
org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.jboss.logging.Logger;
 
 public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@@ -74,6 +74,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
 
    private boolean started;
 
+   private AtomicBoolean failed = new AtomicBoolean(false);
+
    private JDBCJournalSync syncTimer;
 
    private final Executor completeExecutor;
@@ -161,8 +163,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
    }
 
    public synchronized int sync() {
-      if (!started)
-         return 0;
 
       List<JDBCJournalRecord> recordRef;
       synchronized (records) {
@@ -173,6 +173,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
          records.clear();
       }
 
+      if (!started || failed.get()) {
+         executeCallbacks(recordRef, false);
+         return 0;
+      }
+
+
       // We keep a list of deleted records and committed tx (used for cleaning 
up old transaction data).
       List<Long> deletedRecords = new ArrayList<>();
       List<Long> committedTransactions = new ArrayList<>();
@@ -183,6 +189,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
          connection.setAutoCommit(false);
 
          for (JDBCJournalRecord record : recordRef) {
+
+            if (logger.isTraceEnabled()) {
+               logger.trace("sync::preparing JDBC statment for " + record);
+            }
+
+
+
             switch (record.getRecordType()) {
                case JDBCJournalRecord.DELETE_RECORD:
                   // Standard SQL Delete Record, Non transactional delete
@@ -217,6 +230,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
          deleteJournalTxRecords.executeBatch();
 
          connection.commit();
+         if (logger.isTraceEnabled()) {
+            logger.trace("JDBC commit worked");
+         }
 
          cleanupTxRecords(deletedRecords, committedTransactions);
          executeCallbacks(recordRef, true);
@@ -224,14 +240,38 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
          return recordRef.size();
 
       } catch (Exception e) {
-         criticalIOErrorListener.onIOException(e, "Critical IO Error.  Failed 
to process JDBC Record statements", null);
-         started = false;
-         executeCallbacks(recordRef, false);
-         performRollback(recordRef);
+         handleException(recordRef, e);
          return 0;
       }
    }
 
+   /** public for tests only, not through API */
+   public void handleException(List<JDBCJournalRecord> recordRef, Throwable e) 
{
+      logger.warn(e.getMessage(), e);
+      failed.set(true);
+      criticalIOErrorListener.onIOException(e, "Critical IO Error.  Failed to 
process JDBC Record statements", null);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("Rolling back Transaction, just in case");
+      }
+
+      try {
+         connection.rollback();
+      } catch (Throwable rollback) {
+         logger.warn(rollback);
+      }
+
+      try {
+         connection.close();
+      } catch (Throwable rollback) {
+         logger.warn(rollback);
+      }
+
+      if (recordRef != null) {
+         executeCallbacks(recordRef, false);
+      }
+   }
+
    /* We store Transaction reference in memory (once all records associated 
with a Tranascation are Deleted,
       we remove the Tx Records (i.e. PREPARE, COMMIT). */
    private synchronized void cleanupTxRecords(List<Long> deletedRecords, 
List<Long> committedTx) throws SQLException {
@@ -263,45 +303,49 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       }
    }
 
-   private void performRollback(List<JDBCJournalRecord> records) {
-      try {
-         for (JDBCJournalRecord record : records) {
-            if (record.isTransactional() || record.getRecordType() == 
JDBCJournalRecord.PREPARE_RECORD) {
-               removeTxRecord(record);
-            }
-         }
-
-         List<TransactionHolder> txHolders = new ArrayList<>();
-         txHolders.addAll(transactions.values());
-
-         // On rollback we must update the tx map to remove all the tx entries
-         for (TransactionHolder txH : txHolders) {
-            if (!txH.prepared && txH.recordInfos.isEmpty() && 
txH.recordsToDelete.isEmpty()) {
-               transactions.remove(txH.transactionID);
-            }
-         }
-         connection.rollback();
-      } catch (Exception sqlE) {
-         logger.error(sqlE.getMessage(), sqlE);
-         criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null);
-         ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
-      }
-   }
-
-   private void executeCallbacks(final List<JDBCJournalRecord> records, final 
boolean result) {
+   private void executeCallbacks(final List<JDBCJournalRecord> records, final 
boolean success) {
       Runnable r = new Runnable() {
          @Override
          public void run() {
             for (JDBCJournalRecord record : records) {
-               record.complete(result);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Calling callback " + record + " with success = 
" + success);
+               }
+               record.complete(success);
             }
          }
       };
       completeExecutor.execute(r);
    }
 
+
+   private void checkStatus() {
+      checkStatus(null);
+   }
+
+   private void checkStatus(IOCompletion callback) {
+      if (!started) {
+         if (callback != null) callback.onError(-1, "JDBC Journal is not 
loaded");
+         throw new IllegalStateException("JDBCJournal is not loaded");
+      }
+
+      if (failed.get()) {
+         if (callback != null) callback.onError(-1, "JDBC Journal failed");
+         throw new IllegalStateException("JDBCJournal Failed");
+      }
+   }
+
+
    private void appendRecord(JDBCJournalRecord record) throws Exception {
 
+      // extra measure I know, as all the callers are also checking for this..
+      // better to be safe ;)
+      checkStatus();
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendRecord " + record);
+      }
+
       record.storeLineUp();
       if (!started) {
          if (record.getIoCompletion() != null) {
@@ -330,6 +374,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
    }
 
    private synchronized void addTxRecord(JDBCJournalRecord record) {
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("addTxRecord " + record + ", started=" + started + ", 
failed=" + failed);
+      }
+
+      checkStatus();
+
       TransactionHolder txHolder = transactions.get(record.getTxId());
       if (txHolder == null) {
          txHolder = new TransactionHolder(record.getTxId());
@@ -349,28 +400,22 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       }
    }
 
-   private synchronized void removeTxRecord(JDBCJournalRecord record) {
-      TransactionHolder txHolder = transactions.get(record.getTxId());
-
-      // We actually only need the record ID in this instance.
-      if (record.isTransactional()) {
-         RecordInfo info = new RecordInfo(record.getTxId(), 
record.getRecordType(), new byte[0], record.isUpdate(), 
record.getCompactCount());
-         if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
-            txHolder.recordsToDelete.remove(info);
-         } else {
-            txHolder.recordInfos.remove(info);
-         }
-      } else {
-         txHolder.prepared = false;
-      }
-   }
-
    @Override
    public void appendAddRecord(long id, byte recordType, byte[] record, 
boolean sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
+
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendAddRecord bytes[] " + r);
+      }
+
+
+
       appendRecord(r);
    }
 
@@ -380,6 +425,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       r.setUserRecordType(recordType);
       r.setRecord(persister, record);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendAddRecord (encoding) " + r + " with record = " + 
record);
+      }
+
+
       appendRecord(r);
    }
 
@@ -390,20 +441,36 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
                                Object record,
                                boolean sync,
                                IOCompletion completionCallback) throws 
Exception {
+      checkStatus(completionCallback);
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(persister, record);
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendAddRecord (completionCallback & encoding) " + r + 
" with record = " + record);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendUpdateRecord(long id, byte recordType, byte[] record, 
boolean sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendUpdateRecord (bytes)) " + r);
+      }
+
+
       appendRecord(r);
    }
 
@@ -413,6 +480,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       r.setUserRecordType(recordType);
       r.setRecord(persister, record);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendUpdateRecord (encoding)) " + r + " with record " 
+ record);
+      }
+
+
       appendRecord(r);
    }
 
@@ -423,36 +496,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
                                   Object record,
                                   boolean sync,
                                   IOCompletion completionCallback) throws 
Exception {
+      checkStatus(completionCallback);
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(persister, record);
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendUpdateRecord (encoding & completioncallback)) " + 
r + " with record " + record);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendDeleteRecord(long id, boolean sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendDeleteRecord id=" + id + " sync=" + sync);
+      }
+
       appendRecord(r);
    }
 
    @Override
    public void appendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception {
+      checkStatus(completionCallback);
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendDeleteRecord id=" + id + " sync=" + sync + " with 
completionCallback");
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendAddRecordTransactional(long txID, long id, byte 
recordType, byte[] record) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setTxId(txID);
       appendRecord(r);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + 
id + " using bytes[] r=" + r);
+      }
+
+
    }
 
    @Override
@@ -465,15 +569,29 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       r.setUserRecordType(recordType);
       r.setRecord(persister, record);
       r.setTxId(txID);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + 
id + " using encoding=" + record + " and r=" + r);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendUpdateRecordTransactional(long txID, long id, byte 
recordType, byte[] record) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setTxId(txID);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" 
+ id + " using bytes and r=" + r);
+      }
+
+
       appendRecord(r);
    }
 
@@ -487,46 +605,88 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       r.setUserRecordType(recordType);
       r.setRecord(persister, record);
       r.setTxId(txID);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" 
+ id + " using encoding=" + record + " and r=" + r);
+      }
+
       appendRecord(r);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, byte[] 
record) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
       r.setRecord(record);
       r.setTxId(txID);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" 
+ id + " using bytes and r=" + r);
+      }
+
       appendRecord(r);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, 
EncodingSupport record) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
       r.setRecord(EncoderPersister.getInstance(), record);
       r.setTxId(txID);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" 
+ id + " using encoding=" + record + " and r=" + r);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id) throws 
Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
       r.setTxId(txID);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" 
+ id);
+      }
+
       appendRecord(r);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(-1, 
JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendCommitRecord txID=" + txID + " sync=" + sync);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync, IOCompletion 
callback) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(-1, 
JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setSync(sync);
       r.setIoCompletion(callback);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendCommitRecord txID=" + txID + " callback=" + 
callback);
+      }
+
       appendRecord(r);
    }
 
@@ -535,20 +695,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
                                   boolean sync,
                                   IOCompletion callback,
                                   boolean lineUpContext) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(-1, 
JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setStoreLineUp(lineUpContext);
       r.setIoCompletion(callback);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendCommitRecord txID=" + txID + " using callback, 
lineup=" + lineUpContext);
+      }
+
       appendRecord(r);
    }
 
    @Override
    public void appendPrepareRecord(long txID, EncodingSupport transactionData, 
boolean sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(-1, 
JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setTxData(transactionData);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendPrepareRecord txID=" + txID + " using sync=" + 
sync);
+      }
+
+
       appendRecord(r);
    }
 
@@ -557,43 +732,74 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
                                    EncodingSupport transactionData,
                                    boolean sync,
                                    IOCompletion callback) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(0, 
JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setTxData(transactionData);
       r.setTxData(transactionData);
       r.setSync(sync);
       r.setIoCompletion(callback);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendPrepareRecord txID=" + txID + " using callback, 
sync=" + sync);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendPrepareRecord(long txID, byte[] transactionData, boolean 
sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(0, 
JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setTxData(transactionData);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendPrepareRecord txID=" + txID + " transactionData, 
sync=" + sync);
+      }
+
+
       appendRecord(r);
    }
 
    @Override
    public void appendRollbackRecord(long txID, boolean sync) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(0, 
JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setSync(sync);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync);
+      }
+
       appendRecord(r);
    }
 
    @Override
    public void appendRollbackRecord(long txID, boolean sync, IOCompletion 
callback) throws Exception {
+      checkStatus();
+
       JDBCJournalRecord r = new JDBCJournalRecord(0, 
JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setSync(sync);
       r.setIoCompletion(callback);
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync + 
" using callback");
+      }
+
+
       appendRecord(r);
    }
 
    @Override
-   public synchronized JournalLoadInformation load(LoaderCallback 
reloadManager) throws Exception {
+   public synchronized JournalLoadInformation load(LoaderCallback 
reloadManager) {
       JournalLoadInformation jli = new JournalLoadInformation();
       JDBCJournalReaderCallback jrc = new 
JDBCJournalReaderCallback(reloadManager);
       JDBCJournalRecord r;
@@ -643,6 +849,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
          jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
          jli.setNumberOfRecords(noRecords);
          transactions = jrc.getTransactions();
+      } catch (Throwable e) {
+         handleException(null, e);
       }
       return jli;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfe2bdd7/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index a33888d..6750da1 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -306,4 +306,23 @@ class JDBCJournalRecord {
    long getSeq() {
       return seq;
    }
+
+   @Override
+   public String toString() {
+      return "JDBCJournalRecord{" +
+         "compactCount=" + compactCount +
+         ", id=" + id +
+         ", isTransactional=" + isTransactional +
+         ", isUpdate=" + isUpdate +
+         ", recordType=" + recordType +
+         ", seq=" + seq +
+         ", storeLineUp=" + storeLineUp +
+         ", sync=" + sync +
+         ", txCheckNoRecords=" + txCheckNoRecords +
+         ", txDataSize=" + txDataSize +
+         ", txId=" + txId +
+         ", userRecordType=" + userRecordType +
+         ", variableSize=" + variableSize +
+         '}';
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfe2bdd7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 112cc46f..12e2b35 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -122,12 +122,10 @@ public class PagingStoreFactoryDatabase implements 
PagingStoreFactory {
             if (sqlProviderFactory == null) {
                sqlProviderFactory = new GenericSQLProvider.Factory();
             }
-            pagingFactoryFileFactory = new 
JDBCSequentialFileFactory(dbConf.getDataSource(), 
sqlProviderFactory.create(dbConf.getPageStoreTableName(), 
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), 
criticalErrorListener);
-            pagingFactoryFileFactory = new 
JDBCSequentialFileFactory(dbConf.getDataSource(), 
sqlProviderFactory.create(pageStoreTableNamePrefix, 
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
+            pagingFactoryFileFactory = new 
JDBCSequentialFileFactory(dbConf.getDataSource(), 
sqlProviderFactory.create(pageStoreTableNamePrefix, 
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), 
criticalErrorListener);
          } else {
             String driverClassName = dbConf.getJdbcDriverClassName();
-            pagingFactoryFileFactory = new 
JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, 
JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), 
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), 
criticalErrorListener);
-            pagingFactoryFileFactory = new 
JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, 
JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, 
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
+            pagingFactoryFileFactory = new 
JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, 
JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, 
SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), 
criticalErrorListener);
          }
          pagingFactoryFileFactory.start();
          started = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfe2bdd7/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 92cae64..385376e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -614,13 +614,17 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             doConfirmAndResponse(confirmPacket, exceptionMessage, flush, 
closeChannel);
 
             if (logger.isTraceEnabled()) {
-               logger.trace("ServerSessionPacketHandler::response sent::" + 
response);
+               logger.trace("ServerSessionPacketHandler::exception response 
sent::" + exceptionMessage);
             }
 
          }
 
          @Override
          public void done() {
+            if (logger.isTraceEnabled()) {
+               logger.trace("ServerSessionPacketHandler::regular response 
sent::" + response);
+            }
+
             doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
          }
       });

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfe2bdd7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java
new file mode 100644
index 0000000..cf2815c
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ShutdownServerTest extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+   private ServerLocator locator;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = createServer(true, createDefaultJDBCConfig(false), 
AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
+      server.start();
+
+      locator = createFactory(false);
+   }
+
+   @Test
+   public void testShutdownServer() throws Throwable {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(QUEUE, QUEUE, null, true);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+      ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 
0, System.currentTimeMillis(), (byte) 4);
+      message.getBodyBuffer().writeString("hi");
+      message.putStringProperty("hello", "elo");
+      producer.send(message);
+
+      ActiveMQServerImpl impl = (ActiveMQServerImpl) server;
+      JournalStorageManager journal = (JournalStorageManager) 
impl.getStorageManager();
+      JDBCJournalImpl journalimpl = (JDBCJournalImpl) 
journal.getMessageJournal();
+      journalimpl.handleException(null, new Exception("failure"));
+
+      Wait.waitFor(() -> !server.isStarted());
+
+      Assert.assertFalse(server.isStarted());
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfe2bdd7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
index 510452e..07cacf2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
@@ -95,7 +95,7 @@ public class BasicXaTest extends ActiveMQTestBase {
          configuration = createDefaultNettyConfig();
       }
 
-      messagingService = createServer(false, configuration, -1, -1, 
addressSettings);
+      messagingService = createServer(true, configuration, -1, -1, 
addressSettings);
 
       // start the server
       messagingService.start();

Reply via email to