Author: dejanb
Date: Fri Nov 13 12:56:38 2009
New Revision: 835833

URL: http://svn.apache.org/viewvc?rev=835833&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-2042 - additional fix for kahaDB 
store

Modified:
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=835833&r1=835832&r2=835833&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 (original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 Fri Nov 13 12:56:38 2009
@@ -21,6 +21,7 @@
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
@@ -85,6 +86,7 @@
         public final CountDownLatch latch = new CountDownLatch(1);
                private final int offset;
         public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+        public AtomicReference<IOException> exception = new 
AtomicReference<IOException>();
 
         public WriteBatch(DataFile dataFile, int offset, WriteCommand write) 
throws IOException {
             this.dataFile = dataFile;
@@ -184,6 +186,10 @@
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
+            IOException exception = batch.exception.get(); 
+            if (exception != null) {
+               throw exception;
+            }
         }      
 
         return location;
@@ -404,11 +410,16 @@
                 wb.latch.countDown();
             }
         } catch (IOException e) {
-               if (wb != null) {
-                       wb.latch.countDown();
-               }
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
+                if (wb != null) {
+                    wb.latch.countDown();
+                    wb.exception.set(e);
+                }
+                if (nextWriteBatch != null) {
+                   nextWriteBatch.latch.countDown();
+                   nextWriteBatch.exception.set(e);
+                }
             }
         } catch (InterruptedException e) {
         } finally {


Reply via email to