Author: dejanb
Date: Mon Nov 16 15:22:19 2009
New Revision: 880792

URL: http://svn.apache.org/viewvc?rev=880792&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2042 - enable kahadb to recover 
from 'no space available'

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Mon Nov 16 15:22:19 2009
@@ -232,6 +232,41 @@
         }
        }
        
+       private void startCheckpoint() {
+        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+            public void run() {
+                try {
+                    long lastCleanup = System.currentTimeMillis();
+                    long lastCheckpoint = System.currentTimeMillis();
+                    // Sleep for a short time so we can periodically check 
+                    // to see if we need to exit this thread.
+                    long sleepTime = Math.min(checkpointInterval, 500);
+                    while (opened.get()) {
+                        
+                        Thread.sleep(sleepTime);
+                        long now = System.currentTimeMillis();
+                        if( now - lastCleanup >= cleanupInterval ) {
+                            checkpointCleanup(true);
+                            lastCleanup = now;
+                            lastCheckpoint = now;
+                        } else if( now - lastCheckpoint >= checkpointInterval 
) {
+                            checkpointCleanup(false);
+                            lastCheckpoint = now;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Looks like someone really wants us to exit this 
thread...
+                } catch (IOException ioe) {
+                    LOG.error("Checkpoint failed", ioe);
+                    brokerService.handleIOException(ioe);
+                }
+            }
+                    
+        };
+        checkpointThread.setDaemon(true);
+        checkpointThread.start();
+       }
+       
        /**
         * @throws IOException
         */
@@ -241,37 +276,7 @@
             
                loadPageFile();
                
-               checkpointThread = new Thread("ActiveMQ Journal Checkpoint 
Worker") {
-                   public void run() {
-                       try {
-                           long lastCleanup = System.currentTimeMillis();
-                           long lastCheckpoint = System.currentTimeMillis();
-                           
-                           // Sleep for a short time so we can periodically 
check 
-                           // to see if we need to exit this thread.
-                           long sleepTime = Math.min(checkpointInterval, 500);
-                           while (opened.get()) {
-                               Thread.sleep(sleepTime);
-                               long now = System.currentTimeMillis();
-                               if( now - lastCleanup >= cleanupInterval ) {
-                                   checkpointCleanup(true);
-                                   lastCleanup = now;
-                                   lastCheckpoint = now;
-                               } else if( now - lastCheckpoint >= 
checkpointInterval ) {
-                                   checkpointCleanup(false);
-                                   lastCheckpoint = now;
-                               }
-                           }
-                       } catch (InterruptedException e) {
-                           // Looks like someone really wants us to exit this 
thread...
-                       } catch (IOException ioe) {
-                           LOG.error("Checkpoint failed", ioe);
-                           brokerService.handleIOException(ioe);
-                       }
-                   }
-               };
-               checkpointThread.setDaemon(true);
-               checkpointThread.start();
+               startCheckpoint();
             recover();
                }
        }
@@ -621,31 +626,40 @@
     }
 
     /**
-     * All updated are are funneled through this method. The updates a 
converted
+     * All updated are are funneled through this method. The updates are 
converted
      * to a JournalMessage which is logged to the journal and then the data 
from
      * the JournalMessage is used to update the index just like it would be 
done
-     * durring a recovery process.
+     * during a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws 
IOException {
-       
-        int size = data.serializedSizeFramed();
-        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.type().getNumber());
-        data.writeFramed(os);
-
-        long start = System.currentTimeMillis();
-        Location location = journal.write(os.toByteSequence(), sync);
-        long start2 = System.currentTimeMillis();
-        process(data, location);
-       long end = System.currentTimeMillis();
-       if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-               LOG.info("Slow KahaDB access: Journal append took: 
"+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+       try {
+            int size = data.serializedSizeFramed();
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size 
+ 1);
+            os.writeByte(data.type().getNumber());
+            data.writeFramed(os);
+    
+            long start = System.currentTimeMillis();
+            Location location = journal.write(os.toByteSequence(), sync);
+            long start2 = System.currentTimeMillis();
+            process(data, location);
+               long end = System.currentTimeMillis();
+               if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) 
{
+                       LOG.info("Slow KahaDB access: Journal append took: 
"+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+               }
+    
+            synchronized (indexMutex) {
+               metadata.lastUpdate = location;
+            }
+            if (!checkpointThread.isAlive()) {
+                LOG.info("KahaDB: Recovering checkpoint thread after 
exception");
+                startCheckpoint();
+            }
+            return location;
+       } catch (IOException ioe) {
+            LOG.error("KahaDB failed to store to Journal", ioe);
+            brokerService.handleIOException(ioe);
+           throw ioe;
        }
-
-        synchronized (indexMutex) {
-               metadata.lastUpdate = location;
-        }
-        return location;
     }
 
     /**

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 (original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 Mon Nov 16 15:22:19 2009
@@ -219,10 +219,7 @@
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
             }
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
-            }
-
+            
             if (!running) {
                 running = true;
                 thread = new Thread() {
@@ -234,6 +231,11 @@
                 thread.setDaemon(true);
                 thread.setName("ActiveMQ Data File Writer");
                 thread.start();
+                firstAsyncException = null;
+            }
+            
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
             }
 
             while ( true ) {
@@ -430,6 +432,7 @@
             } catch (Throwable ignore) {
             }
             shutdownDone.countDown();
+            running = false;
         }
     }
 

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
Mon Nov 16 15:22:19 2009
@@ -44,6 +44,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -165,8 +166,8 @@
         }
         
         void begin() {
-            diskBound = current;
-            current = null;
+           diskBound = current;
+           current = null;
         }
         
         /**
@@ -937,12 +938,18 @@
             // If there is not enough to write, wait for a notification...
 
             batch = new ArrayList<PageWrite>(writes.size());
-            // build a write batch from the current write cache. 
-            for (PageWrite write : writes.values()) {
+            // build a write batch from the current write cache.
+            Iterator<Long> it = writes.keySet().iterator();
+            while (it.hasNext()) {
+                Long key = it.next();
+                PageWrite write = writes.get(key);
                 batch.add(write);
                 // Move the current write to the diskBound write, this lets 
folks update the 
                 // page again without blocking for this write.
                 write.begin();
+                if (write.diskBound == null) {
+                    batch.remove(write);
+                }
             }
 
             // Grab on to the existing checkpoint latch cause once we do this 
write we can 
@@ -959,7 +966,11 @@
            // our write batches are going to much larger.
            Checksum checksum = new Adler32();
            for (PageWrite w : batch) {
-               checksum.update(w.diskBound, 0, pageSize);
+               try {
+                   checksum.update(w.diskBound, 0, pageSize);
+               } catch (Throwable t) {
+                   throw IOExceptionSupport.create("Cannot create recovery 
file. Reason: " + t, t);
+               }
            }
            
            // Can we shrink the recovery buffer??


Reply via email to