Author: dejanb
Date: Fri Nov 20 11:36:45 2009
New Revision: 882511

URL: http://svn.apache.org/viewvc?rev=882511&view=rev
Log:
merging https://issues.apache.org/activemq/browse/AMQ-2042 - 
834922,835373,835412,835833,835888,880792,881221,882144

Added:
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
      - copied, changed from r835888, 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
      - copied unchanged from r835888, 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
Modified:
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 Fri Nov 20 11:36:45 2009
@@ -84,6 +84,8 @@
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.JMXSupport;
@@ -178,7 +180,9 @@
     private int systemExitOnShutdownExitCode;
     private SslContext sslContext;
     private boolean forceStart = false;
-    static {
+    private IOExceptionHandler ioExceptionHandler;
+
+       static {
         String localHostName = "localhost";
         try {
             localHostName = java.net.InetAddress.getLocalHost().getHostName();
@@ -481,6 +485,9 @@
                 }
             }
             brokerId = broker.getBrokerId();
+            if (ioExceptionHandler == null) {
+               setIoExceptionHandler(new DefaultIOExceptionHandler());
+            }
             LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " 
+ brokerId + ") started");
             getBroker().brokerServiceStarted();
             startedLatch.countDown();
@@ -2008,6 +2015,14 @@
             }
         }
     }
+    
+    public void handleIOException(IOException exception) {
+        if (ioExceptionHandler != null) {
+            ioExceptionHandler.handle(exception);
+         } else {
+            LOG.info("Ignoring IO exception, " + exception, exception);
+         }
+    }
 
     /**
      * Starts all destiantions in persistence store. This includes all inactive
@@ -2111,5 +2126,10 @@
         this.passiveSlave = passiveSlave;
     }
     
+    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
+        ioExceptionHandler.setBrokerService(this);
+        this.ioExceptionHandler = ioExceptionHandler;
+    }
+    
    
 }
\ No newline at end of file

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
 Fri Nov 20 11:36:45 2009
@@ -695,7 +695,13 @@
     }
     
     public Location writeCommand(DataStructure command, boolean 
syncHint,boolean forceSync) throws IOException {
-        return asyncDataManager.write(wireFormat.marshal(command), 
(forceSync||(syncHint && syncOnWrite)));
+       try {
+               return asyncDataManager.write(wireFormat.marshal(command), 
(forceSync||(syncHint && syncOnWrite)));
+       } catch (IOException ioe) {
+               LOG.error("Failed to write command: " + command + ". Reason: " 
+ ioe, ioe);
+               brokerService.handleIOException(ioe);
+               throw ioe;
+        }
     }
 
     private Location writeTraceMessage(String message, boolean sync) throws 
IOException {

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
 Fri Nov 20 11:36:45 2009
@@ -623,7 +623,7 @@
                    return journal.write(toPacket(wireFormat.marshal(command)), 
sync);
             } catch (IOException ioe) {
                    LOG.error("Cannot write to the journal", ioe);
-                   stopBroker();
+                   brokerService.handleIOException(ioe);
                    throw ioe;
             }
         }
@@ -725,17 +725,5 @@
             ((BrokerServiceAware)pa).setBrokerService(brokerService);
         }
     }
-    
-    protected void stopBroker() {
-        new Thread() {
-           public void run() {
-                  try {
-                   brokerService.stop();
-               } catch (Exception e) {
-                   LOG.warn("Failure occured while stopping broker");
-               }                       
-               }
-       }.start();
-    }
 
 }

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
 Fri Nov 20 11:36:45 2009
@@ -24,6 +24,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -54,7 +56,7 @@
  * @org.apache.xbean.XBean
  * @version $Revision: 1.4 $
  */
-public class KahaPersistenceAdapter implements PersistenceAdapter {
+public class KahaPersistenceAdapter implements PersistenceAdapter, 
BrokerServiceAware {
 
     private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
     private static final Log LOG = 
LogFactory.getLog(KahaPersistenceAdapter.class);
@@ -73,6 +75,7 @@
     private boolean initialized;
     private final AtomicLong storeSize;
     private boolean persistentIndex = true;
+    private BrokerService brokerService;
 
     
     public KahaPersistenceAdapter(AtomicLong size) {
@@ -175,6 +178,7 @@
                     container.setValueMarshaller(new 
TransactionMarshaller(wireFormat));
                     container.load();
                     transactionStore = new KahaTransactionStore(this, 
container);
+                    transactionStore.setBrokerService(brokerService);
                     break;
                 } catch (StoreLockedExcpetion e) {
                     LOG.info("Store is locked... waiting " + 
(STORE_LOCKED_WAIT_DELAY / 1000)
@@ -361,6 +365,10 @@
             wireFormat.setTightEncodingEnabled(true);
         }
     }
+
+       public void setBrokerService(BrokerService brokerService) {
+               this.brokerService = brokerService;
+       }
   
 
 }

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
 Fri Nov 20 11:36:45 2009
@@ -24,17 +24,23 @@
 
 import javax.transaction.xa.XAException;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Provides a TransactionStore implementation that can create transaction aware
@@ -42,10 +48,14 @@
  * 
  * @version $Revision: 1.4 $
  */
-public class KahaTransactionStore implements TransactionStore {
+public class KahaTransactionStore implements TransactionStore, 
BrokerServiceAware {    
+    private static final Log LOG = 
LogFactory.getLog(KahaTransactionStore.class);
+       
     private Map transactions = new ConcurrentHashMap();
     private Map prepared;
     private KahaPersistenceAdapter adaptor;
+    
+    private BrokerService brokerService;
 
     KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
         this.adaptor = adaptor;
@@ -130,12 +140,19 @@
      * @throws IOException
      */
     void addMessage(final MessageStore destination, final Message message) 
throws IOException {
-        if (message.isInTransaction()) {
-            KahaTransaction tx = getOrCreateTx(message.getTransactionId());
-            tx.add((KahaMessageStore)destination, message);
-        } else {
-            destination.addMessage(null, message);
-        }
+       try {
+               if (message.isInTransaction()) {
+                       KahaTransaction tx = 
getOrCreateTx(message.getTransactionId());
+                       tx.add((KahaMessageStore)destination, message);
+               } else {
+                       destination.addMessage(null, message);
+               }
+       } catch (RuntimeStoreException rse) {
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
+       }
     }
 
     /**
@@ -143,12 +160,19 @@
      * @throws IOException
      */
     final void removeMessage(final MessageStore destination, final MessageAck 
ack) throws IOException {
-        if (ack.isInTransaction()) {
-            KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
-            tx.add((KahaMessageStore)destination, ack);
-        } else {
-            destination.removeMessage(null, ack);
-        }
+       try {
+               if (ack.isInTransaction()) {
+                       KahaTransaction tx = 
getOrCreateTx(ack.getTransactionId());
+                       tx.add((KahaMessageStore)destination, ack);
+               } else {
+                       destination.removeMessage(null, ack);
+               }
+       } catch (RuntimeStoreException rse) {
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
+       }
     }
 
     protected synchronized KahaTransaction getTx(TransactionId key) {
@@ -181,4 +205,8 @@
     protected MessageStore getStoreById(Object id) {
         return adaptor.retrieveMessageStore(id);
     }
+
+       public void setBrokerService(BrokerService brokerService) {
+               this.brokerService = brokerService;
+       }
 }

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 Fri Nov 20 11:36:45 2009
@@ -17,6 +17,8 @@
 package org.apache.activemq.store.kahadb;
 
 import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +39,7 @@
  * @org.apache.xbean.XBean element="kahaDB"
  * @version $Revision: 1.17 $
  */
-public class KahaDBPersistenceAdapter implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter implements PersistenceAdapter, 
BrokerServiceAware {
     private KahaDBStore letter = new KahaDBStore();
     
 
@@ -364,4 +366,8 @@
     public void setCheckForCorruptJournalFiles(boolean 
checkForCorruptJournalFiles) {
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
+
+       public void setBrokerService(BrokerService brokerService) {
+               letter.setBrokerService(brokerService);
+       }
 }

Modified: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Fri Nov 20 11:36:45 2009
@@ -36,6 +36,8 @@
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -75,8 +77,11 @@
 import org.apache.kahadb.util.SequenceSet;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
+import org.springframework.core.enums.LetterCodedLabeledEnum;
 
-public class MessageDatabase {
+public class MessageDatabase implements BrokerServiceAware {
+       
+       private BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = 
"org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = 
Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -227,6 +232,41 @@
         }
        }
        
+       private void startCheckpoint() {
+        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+            public void run() {
+                try {
+                    long lastCleanup = System.currentTimeMillis();
+                    long lastCheckpoint = System.currentTimeMillis();
+                    // Sleep for a short time so we can periodically check 
+                    // to see if we need to exit this thread.
+                    long sleepTime = Math.min(checkpointInterval, 500);
+                    while (opened.get()) {
+                        
+                        Thread.sleep(sleepTime);
+                        long now = System.currentTimeMillis();
+                        if( now - lastCleanup >= cleanupInterval ) {
+                            checkpointCleanup(true);
+                            lastCleanup = now;
+                            lastCheckpoint = now;
+                        } else if( now - lastCheckpoint >= checkpointInterval 
) {
+                            checkpointCleanup(false);
+                            lastCheckpoint = now;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Looks like someone really wants us to exit this 
thread...
+                } catch (IOException ioe) {
+                    LOG.error("Checkpoint failed", ioe);
+                    brokerService.handleIOException(ioe);
+                }
+            }
+                    
+        };
+        checkpointThread.setDaemon(true);
+        checkpointThread.start();
+       }
+       
        /**
         * @throws IOException
         */
@@ -236,34 +276,7 @@
             
                loadPageFile();
                
-               checkpointThread = new Thread("ActiveMQ Journal Checkpoint 
Worker") {
-                   public void run() {
-                       try {
-                           long lastCleanup = System.currentTimeMillis();
-                           long lastCheckpoint = System.currentTimeMillis();
-                           
-                           // Sleep for a short time so we can periodically 
check 
-                           // to see if we need to exit this thread.
-                           long sleepTime = Math.min(checkpointInterval, 500);
-                           while (opened.get()) {
-                               Thread.sleep(sleepTime);
-                               long now = System.currentTimeMillis();
-                               if( now - lastCleanup >= cleanupInterval ) {
-                                   checkpointCleanup(true);
-                                   lastCleanup = now;
-                                   lastCheckpoint = now;
-                               } else if( now - lastCheckpoint >= 
checkpointInterval ) {
-                                   checkpointCleanup(false);
-                                   lastCheckpoint = now;
-                               }
-                           }
-                       } catch (InterruptedException e) {
-                           // Looks like someone really wants us to exit this 
thread...
-                       }
-                   }
-               };
-               checkpointThread.setDaemon(true);
-               checkpointThread.start();
+               startCheckpoint();
             recover();
                }
        }
@@ -575,26 +588,22 @@
         return journal.getNextLocation(null);
        }
 
-    protected void checkpointCleanup(final boolean cleanup) {
-        try {
-               long start = System.currentTimeMillis();
-            synchronized (indexMutex) {
-               if( !opened.get() ) {
-                       return;
-               }
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, cleanup);
-                    }
-                });
-            }
-               long end = System.currentTimeMillis();
-               if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) 
{
-                       LOG.info("Slow KahaDB access: cleanup took 
"+(end-start));
+    protected void checkpointCleanup(final boolean cleanup) throws IOException 
{
+       long start = System.currentTimeMillis();
+        synchronized (indexMutex) {
+               if( !opened.get() ) {
+                       return;
                }
-        } catch (IOException e) {
-               e.printStackTrace();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    checkpointUpdate(tx, cleanup);
+                }
+            });
         }
+       long end = System.currentTimeMillis();
+       if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+               LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+       }
     }
 
     
@@ -617,32 +626,40 @@
     }
 
     /**
-     * All updated are are funneled through this method. The updates a 
converted
+     * All updated are are funneled through this method. The updates are 
converted
      * to a JournalMessage which is logged to the journal and then the data 
from
      * the JournalMessage is used to update the index just like it would be 
done
-     * durring a recovery process.
+     * during a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws 
IOException {
-
-       
-        int size = data.serializedSizeFramed();
-        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.type().getNumber());
-        data.writeFramed(os);
-
-        long start = System.currentTimeMillis();
-        Location location = journal.write(os.toByteSequence(), sync);
-        long start2 = System.currentTimeMillis();
-        process(data, location);
-       long end = System.currentTimeMillis();
-       if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-               LOG.info("Slow KahaDB access: Journal append took: 
"+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+       try {
+            int size = data.serializedSizeFramed();
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size 
+ 1);
+            os.writeByte(data.type().getNumber());
+            data.writeFramed(os);
+    
+            long start = System.currentTimeMillis();
+            Location location = journal.write(os.toByteSequence(), sync);
+            long start2 = System.currentTimeMillis();
+            process(data, location);
+               long end = System.currentTimeMillis();
+               if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) 
{
+                       LOG.info("Slow KahaDB access: Journal append took: 
"+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+               }
+    
+            synchronized (indexMutex) {
+               metadata.lastUpdate = location;
+            }
+            if (!checkpointThread.isAlive()) {
+                LOG.info("KahaDB: Recovering checkpoint thread after 
exception");
+                startCheckpoint();
+            }
+            return location;
+       } catch (IOException ioe) {
+            LOG.error("KahaDB failed to store to Journal", ioe);
+            brokerService.handleIOException(ioe);
+           throw ioe;
        }
-
-        synchronized (indexMutex) {
-               metadata.lastUpdate = location;
-        }
-        return location;
     }
 
     /**
@@ -1530,4 +1547,8 @@
     public void setChecksumJournalFiles(boolean checksumJournalFiles) {
         this.checksumJournalFiles = checksumJournalFiles;
     }
+
+       public void setBrokerService(BrokerService brokerService) {
+               this.brokerService = brokerService;
+       }
 }

Copied: 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
 (from r835888, 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java)
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?p2=activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java&r1=835888&r2=882511&rev=882511&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
 (original)
+++ 
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
 Fri Nov 20 11:36:45 2009
@@ -28,12 +28,25 @@
             .getLog(DefaultIOExceptionHandler.class);
     private BrokerService broker;
     private boolean ignoreAllErrors = false;
+    private boolean ignoreNoSpaceErrors = true;
+    private String noSpaceMessage = "space";
 
     public void handle(IOException exception) {
         if (ignoreAllErrors) {
             LOG.info("Ignoring IO exception, " + exception, exception);
             return;
         }
+        
+        if (ignoreNoSpaceErrors) {
+            Throwable cause = exception;
+            while (cause != null && cause instanceof IOException) {
+                if (cause.getMessage().contains(noSpaceMessage)) {
+                    LOG.info("Ignoring no space left exception, " + exception, 
exception);
+                    return;
+                }
+                cause = cause.getCause();
+            }
+        }
 
         LOG.info("Stopping the broker due to IO exception, " + exception, 
exception);
         new Thread() {
@@ -59,4 +72,20 @@
         this.ignoreAllErrors = ignoreAllErrors;
     }
 
+    public boolean isIgnoreNoSpaceErrors() {
+        return ignoreNoSpaceErrors;
+    }
+
+    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
+        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
+    }
+
+    public String getNoSpaceMessage() {
+        return noSpaceMessage;
+    }
+
+    public void setNoSpaceMessage(String noSpaceMessage) {
+        this.noSpaceMessage = noSpaceMessage;
+    }
+
 }

Modified: 
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 (original)
+++ 
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 Fri Nov 20 11:36:45 2009
@@ -21,6 +21,7 @@
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
@@ -85,6 +86,7 @@
         public final CountDownLatch latch = new CountDownLatch(1);
                private final int offset;
         public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+        public AtomicReference<IOException> exception = new 
AtomicReference<IOException>();
 
         public WriteBatch(DataFile dataFile, int offset, WriteCommand write) 
throws IOException {
             this.dataFile = dataFile;
@@ -158,7 +160,7 @@
      * @throws
      */
     public Location storeItem(ByteSequence data, byte type, boolean sync) 
throws IOException {
-
+       
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
@@ -184,6 +186,10 @@
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
+            IOException exception = batch.exception.get(); 
+            if (exception != null) {
+               throw exception;
+            }
         }      
 
         return location;
@@ -213,10 +219,7 @@
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
             }
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
-            }
-
+            
             if (!running) {
                 running = true;
                 thread = new Thread() {
@@ -228,6 +231,11 @@
                 thread.setDaemon(true);
                 thread.setName("ActiveMQ Data File Writer");
                 thread.start();
+                firstAsyncException = null;
+            }
+            
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
             }
 
             while ( true ) {
@@ -298,6 +306,7 @@
     protected void processQueue() {
         DataFile dataFile = null;
         RandomAccessFile file = null;
+        WriteBatch wb = null;
         try {
 
             DataByteArrayOutputStream buff = new 
DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +330,7 @@
                     enqueueMutex.notify();
                 }
 
-                WriteBatch wb = (WriteBatch)o;
+                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         file.setLength(dataFile.getLength());
@@ -405,6 +414,14 @@
         } catch (IOException e) {
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
+                if (wb != null) {
+                    wb.latch.countDown();
+                    wb.exception.set(e);
+                }
+                if (nextWriteBatch != null) {
+                   nextWriteBatch.latch.countDown();
+                   nextWriteBatch.exception.set(e);
+                }
             }
         } catch (InterruptedException e) {
         } finally {
@@ -415,6 +432,7 @@
             } catch (Throwable ignore) {
             }
             shutdownDone.countDown();
+            running = false;
         }
     }
 

Modified: 
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- 
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
 (original)
+++ 
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
 Fri Nov 20 11:36:45 2009
@@ -44,6 +44,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -165,8 +166,8 @@
         }
         
         void begin() {
-            diskBound = current;
-            current = null;
+           diskBound = current;
+           current = null;
         }
         
         /**
@@ -176,6 +177,10 @@
             diskBound=null;
             return current == null;
         }
+        
+        boolean isDone() {
+            return diskBound == null && current == null;
+        }
 
     }
     
@@ -937,12 +942,15 @@
             // If there is not enough to write, wait for a notification...
 
             batch = new ArrayList<PageWrite>(writes.size());
-            // build a write batch from the current write cache. 
+            // build a write batch from the current write cache.
             for (PageWrite write : writes.values()) {
                 batch.add(write);
                 // Move the current write to the diskBound write, this lets 
folks update the 
                 // page again without blocking for this write.
                 write.begin();
+                if (write.diskBound == null) {
+                    batch.remove(write);
+                }
             }
 
             // Grab on to the existing checkpoint latch cause once we do this 
write we can 
@@ -951,71 +959,82 @@
             this.checkpointLatch=null;
         }
         
- 
-       if (enableRecoveryFile) {
-           
-           // Using Adler-32 instead of CRC-32 because it's much faster and 
it's 
-           // weakness for short messages with few hundred bytes is not a 
factor in this case since we know 
-           // our write batches are going to much larger.
-           Checksum checksum = new Adler32();
-           for (PageWrite w : batch) {
-               checksum.update(w.diskBound, 0, pageSize);
-           }
-           
-           // Can we shrink the recovery buffer??
-           if( recoveryPageCount > recoveryFileMaxPageCount ) {
-               int t = Math.max(recoveryFileMinPageCount, batch.size());
-               recoveryFile.setLength(recoveryFileSizeForPages(t));
-           }
-           
-            // Record the page writes in the recovery buffer.
-            recoveryFile.seek(0);
-            // Store the next tx id...
-            recoveryFile.writeLong(nextTxid.get());
-            // Store the checksum for thw write batch so that on recovery we 
know if we have a consistent 
-            // write batch on disk.
-            recoveryFile.writeLong(checksum.getValue());
-            // Write the # of pages that will follow
-            recoveryFile.writeInt(batch.size());
-            
-            
-            // Write the pages.
-            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+       try {
+            if (enableRecoveryFile) {
+
+                // Using Adler-32 instead of CRC-32 because it's much faster 
and
+                // it's
+                // weakness for short messages with few hundred bytes is not a
+                // factor in this case since we know
+                // our write batches are going to much larger.
+                Checksum checksum = new Adler32();
+                for (PageWrite w : batch) {
+                    try {
+                        checksum.update(w.diskBound, 0, pageSize);
+                    } catch (Throwable t) {
+                        throw IOExceptionSupport.create(
+                                "Cannot create recovery file. Reason: " + t, 
t);
+                    }
+                }
+
+                // Can we shrink the recovery buffer??
+                if (recoveryPageCount > recoveryFileMaxPageCount) {
+                    int t = Math.max(recoveryFileMinPageCount, batch.size());
+                    recoveryFile.setLength(recoveryFileSizeForPages(t));
+                }
+
+                // Record the page writes in the recovery buffer.
+                recoveryFile.seek(0);
+                // Store the next tx id...
+                recoveryFile.writeLong(nextTxid.get());
+                // Store the checksum for thw write batch so that on recovery 
we
+                // know if we have a consistent
+                // write batch on disk.
+                recoveryFile.writeLong(checksum.getValue());
+                // Write the # of pages that will follow
+                recoveryFile.writeInt(batch.size());
+
+                // Write the pages.
+                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+
+                for (PageWrite w : batch) {
+                    recoveryFile.writeLong(w.page.getPageId());
+                    recoveryFile.write(w.diskBound, 0, pageSize);
+                }
+
+                if (enableDiskSyncs) {
+                    // Sync to make sure recovery buffer writes land on disk..
+                    recoveryFile.getFD().sync();
+                }
+
+                recoveryPageCount = batch.size();
+            }
+
             for (PageWrite w : batch) {
-                recoveryFile.writeLong(w.page.getPageId());
-                recoveryFile.write(w.diskBound, 0, pageSize);
+                writeFile.seek(toOffset(w.page.getPageId()));
+                writeFile.write(w.diskBound, 0, pageSize);
+                w.done();
             }
-            
+
+            // Sync again
             if (enableDiskSyncs) {
-                // Sync to make sure recovery buffer writes land on disk..
-                recoveryFile.getFD().sync();
+                writeFile.getFD().sync();
             }
-            
-            recoveryPageCount = batch.size();
-        }
-       
-        
-        for (PageWrite w : batch) {
-            writeFile.seek(toOffset(w.page.getPageId()));
-            writeFile.write(w.diskBound, 0, pageSize);
-        }
-        
-        // Sync again
-        if( enableDiskSyncs ) {
-            writeFile.getFD().sync();
-        }
-        
-        synchronized( writes ) {
-            for (PageWrite w : batch) {
-                // If there are no more pending writes, then remove it from 
the write cache.
-                if( w.done() ) {
-                    writes.remove(w.page.getPageId());
+
+        } finally {
+            synchronized (writes) {
+                for (PageWrite w : batch) {
+                    // If there are no more pending writes, then remove it from
+                    // the write cache.
+                    if (w.isDone()) {
+                        writes.remove(w.page.getPageId());
+                    }
                 }
             }
-        }
-        
-        if( checkpointLatch!=null ) {
-            checkpointLatch.countDown();
+            
+            if( checkpointLatch!=null ) {
+                checkpointLatch.countDown();
+            }
         }
     }
 


Reply via email to