Author: mgrigorov
Date: Tue Jul 19 08:30:55 2011
New Revision: 1148226

URL: http://svn.apache.org/viewvc?rev=1148226&view=rev
Log:
WICKET-3910 o.a.w.pageStore.AsynchronousDataStore#getData() returns null if the 
StoreEntryRunnable is currently running

Revert to the previous impl of AsynchronousDataStore. 
The one based on ThreadPoolExecutor didn't prove to be stable.

Improve DiskDataStoreTest so that it captures the exceptions thrown by the 
runnables and fails if there is any problem.


Modified:
    
wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
    
wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java

Modified: 
wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
URL: 
http://svn.apache.org/viewvc/wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java?rev=1148226&r1=1148225&r2=1148226&view=diff
==============================================================================
--- 
wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
 (original)
+++ 
wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
 Tue Jul 19 08:30:55 2011
@@ -16,50 +16,67 @@
  */
 package org.apache.wicket.pageStore;
 
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.wicket.util.lang.Args;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Facade for {@link IDataStore} that does the actual saving asynchronously 
(in non-httpWorker
- * thread).
+ * Facade for {@link IDataStore} that does the actual saving in worker thread.
  * <p>
- * Creates an {@link Entry} for each triple (sessionId, pageId, data). Entry 
are saved using a
- * {@link java.lang.Runnable} implementation called StoreEntryRunnable. Tasks 
running and thread
- * coordination is managed using a 
{@linkjava.util.concurrent.ThreadPoolExecutor}
- * 
+ * Creates an {@link Entry} for each triple (sessionId, pageId, data) and puts 
it in
+ * {@link #entries} queue if there is room. Acts as producer.<br/>
+ * Later {@link PageSavingRunnable} reads in blocking manner from {@link 
#entries} and saves each
+ * entry. Acts as consumer.
+ * </p>
+ * It starts only one instance of {@link PageSavingRunnable} because all we 
need is to make the page
+ * storing asynchronous. We don't want to write concurrently in the wrapped 
{@link IDataStore},
+ * though it may happen in the extreme case when the queue is full. These 
cases should be avoided.
  * 
  * @author Matej Knopp
- * @author Andrea Del Bene
  */
 public class AsynchronousDataStore implements IDataStore
 {
-
        /** Log for reporting. */
        private static final Logger log = 
LoggerFactory.getLogger(AsynchronousDataStore.class);
 
        /**
-        * The maximum number of threads to be started by the {@link 
#savePagesExecutor}.
+        * The time to wait when adding an {@link Entry} into the entries. In 
millis.
+        */
+       private static final long OFFER_WAIT = 30L;
+
+       /**
+        * The time to wait for an entry to save with the wrapped {@link 
IDataStore}. In millis.
         */
-       private static final int MAX_THREADS = 1;
+       private static final long POLL_WAIT = 1000L;
+
+       /**
+        * A flag indicating that this {@link IDataStore} should stop
+        */
+       private final AtomicBoolean destroy;
 
        /**
         * The wrapped {@link IDataStore} that actually stores that pages
         */
        private final IDataStore dataStore;
 
-       private final ThreadPoolExecutor savePagesExecutor;
-
        /**
         * The queue where the entries which have to be saved are temporary 
stored
         */
-       private final BlockingQueue<Runnable> entries;
+       private final BlockingQueue<Entry> entries;
+
+       /**
+        * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval 
of {@link Entry}s which
+        * are not yet stored by the wrapped {@link IDataStore}
+        */
+       private final ConcurrentMap<String, Entry> entryMap;
 
        /**
         * Construct.
@@ -72,80 +89,96 @@ public class AsynchronousDataStore imple
        public AsynchronousDataStore(final IDataStore dataStore, final int 
capacity)
        {
                this.dataStore = dataStore;
-               entries = new LinkedBlockingQueue<Runnable>(capacity);
-               savePagesExecutor = new ThreadPoolExecutor(MAX_THREADS, 
MAX_THREADS, 1l, TimeUnit.SECONDS,
-                       entries, new RejectStoringTask());
-
+               destroy = new AtomicBoolean(false);
+               entries = new LinkedBlockingQueue<Entry>(capacity);
+               entryMap = new ConcurrentHashMap<String, Entry>();
+
+               PageSavingRunnable savingRunnable = new 
PageSavingRunnable(dataStore, entries, entryMap,
+                       destroy);
+               Thread thread = new Thread(savingRunnable, 
"Wicket-PageSavingThread");
+               thread.setDaemon(true);
+               thread.start();
        }
 
        /**
-        * @see 
org.apache.wicket.pageStore.IDataStore#getData(java.lang.String, int)
+        * @see org.apache.wicket.pageStore.IDataStore#destroy()
         */
-       public byte[] getData(final String sessionId, final int pageId)
+       public void destroy()
        {
-               Entry entry = null;
+               destroy.set(true);
 
-               for (Runnable runnable : savePagesExecutor.getQueue())
+               try
                {
-                       StoreEntryRunnable storeEntryRunnable = 
(StoreEntryRunnable)runnable;
-                       if (storeEntryRunnable != null)
+                       synchronized (destroy)
                        {
-                               Entry cursorEntry = 
storeEntryRunnable.getEntry();
-
-                               if (cursorEntry.getPageId() == pageId &&
-                                       
cursorEntry.getSessionId().equals(sessionId))
-                               {
-                                       entry = cursorEntry;
-                                       break;
-                               }
+                               destroy.wait();
                        }
                }
-
-               final byte[] data;
-               if (entry != null)
+               catch (InterruptedException e)
                {
-                       data = entry.getData();
-                       if (log.isDebugEnabled())
-                       {
-                               log.debug(
-                                       "Returning the data (with length '{}') 
of a non-stored entry with sessionId '{}' and pageId '{}'",
-                                       new Object[] { data.length, sessionId, 
pageId });
-                       }
+                       log.error(e.getMessage(), e);
                }
-               else
+
+               dataStore.destroy();
+       }
+
+       /**
+        * Little helper
+        * 
+        * @param sessionId
+        * @param id
+        * @return Entry
+        */
+       private Entry getEntry(final String sessionId, final int id)
+       {
+               return entryMap.get(getKey(sessionId, id));
+       }
+
+       /**
+        * @see 
org.apache.wicket.pageStore.IDataStore#getData(java.lang.String, int)
+        */
+       public byte[] getData(final String sessionId, final int id)
+       {
+               Entry entry = getEntry(sessionId, id);
+               if (entry != null)
                {
-                       data = dataStore.getData(sessionId, pageId);
-                       if (log.isDebugEnabled())
-                       {
-                               log.debug(
-                                       "Returning the data (with length '{}') 
of a stored entry with sessionId '{}' and pageId '{}'",
-                                       new Object[] { data.length, sessionId, 
pageId });
-                       }
+                       log.debug(
+                               "Returning the data of a non-stored entry with 
sessionId '{}' and pageId '{}'",
+                               sessionId, id);
+                       return entry.data;
                }
+               byte[] data = dataStore.getData(sessionId, id);
+
+               log.debug("Returning the data of a stored entry with sessionId 
'{}' and pageId '{}'",
+                       sessionId, id);
+
                return data;
        }
 
        /**
+        * @see org.apache.wicket.pageStore.IDataStore#isReplicated()
+        */
+       public boolean isReplicated()
+       {
+               return dataStore.isReplicated();
+       }
+
+       /**
         * @see 
org.apache.wicket.pageStore.IDataStore#removeData(java.lang.String, int)
         */
-       public void removeData(final String sessionId, final int pageId)
+       public void removeData(final String sessionId, final int id)
        {
-               for (Runnable runnable : savePagesExecutor.getQueue())
+               String key = getKey(sessionId, id);
+               if (key != null)
                {
-                       StoreEntryRunnable storeEntryRunnable = 
(StoreEntryRunnable)runnable;
-                       if (storeEntryRunnable != null)
+                       Entry entry = entryMap.remove(key);
+                       if (entry != null)
                        {
-                               Entry cursorEntry = 
storeEntryRunnable.getEntry();
-
-                               if (cursorEntry.getPageId() == pageId &&
-                                       
cursorEntry.getSessionId().equals(sessionId))
-                               {
-                                       savePagesExecutor.remove(runnable);
-                               }
+                               entries.remove(entry);
                        }
                }
 
-               dataStore.removeData(sessionId, pageId);
+               dataStore.removeData(sessionId, id);
        }
 
        /**
@@ -153,16 +186,17 @@ public class AsynchronousDataStore imple
         */
        public void removeData(final String sessionId)
        {
-               for (Runnable runnable : savePagesExecutor.getQueue())
+               for (Iterator<Entry> itor = entries.iterator(); itor.hasNext();)
                {
-                       StoreEntryRunnable storeEntryRunnable = 
(StoreEntryRunnable)runnable;
-                       if (storeEntryRunnable != null)
+                       Entry entry = itor.next();
+                       if (entry != null) // this check is not needed in JDK6
                        {
-                               Entry cursorEntry = 
storeEntryRunnable.getEntry();
+                               String entrySessionId = entry.sessionId;
 
-                               if 
(cursorEntry.getSessionId().equals(sessionId))
+                               if (sessionId.equals(entrySessionId))
                                {
-                                       savePagesExecutor.remove(runnable);
+                                       entryMap.remove(getKey(entry));
+                                       itor.remove();
                                }
                        }
                }
@@ -170,103 +204,55 @@ public class AsynchronousDataStore imple
                dataStore.removeData(sessionId);
        }
 
-
        /**
         * Save the entry in the queue if there is a room or directly pass it 
to the wrapped
         * {@link IDataStore} if there is no such
         * 
         * @see 
org.apache.wicket.pageStore.IDataStore#storeData(java.lang.String, int, byte[])
         */
-       public void storeData(final String sessionId, final int pageId, final 
byte[] data)
+       public void storeData(final String sessionId, final int id, final 
byte[] data)
        {
-               Entry entry = new Entry(sessionId, pageId, data);
-               StoreEntryRunnable storeEntryRunnable = new 
StoreEntryRunnable(entry, dataStore);
-
-               savePagesExecutor.execute(storeEntryRunnable);
-       }
-
-       /**
-        * @see org.apache.wicket.pageStore.IDataStore#destroy()
-        */
-       public void destroy()
-       {
-               log.debug("Going to shutdown the shutdown the task executor.");
-               savePagesExecutor.shutdown();
+               Entry entry = new Entry(sessionId, id, data);
                try
                {
-                       boolean stopped = 
savePagesExecutor.awaitTermination(30, TimeUnit.SECONDS);
-                       if (stopped == false)
+                       boolean added = entries.offer(entry, OFFER_WAIT, 
TimeUnit.MILLISECONDS);
+
+                       if (added == false)
+                       {
+                               log.debug("Storing synchronously page with id 
'{}' in session '{}'", id, sessionId);
+                               dataStore.storeData(sessionId, id, data);
+                       }
+                       else
                        {
-                               log.warn("Some tasks didn't stop successfully. 
They were forcefully stopped.");
-                               savePagesExecutor.shutdownNow();
+                               entryMap.put(getKey(entry), entry);
                        }
                }
                catch (InterruptedException e)
                {
-                       throw new RuntimeException(e);
+                       log.error(e.getMessage(), e);
+                       dataStore.storeData(sessionId, id, data);
                }
-
-               log.debug("Going to shutdown the shutdown the underlying 
IDataStore.");
-               dataStore.destroy();
        }
 
        /**
-        * @see org.apache.wicket.pageStore.IDataStore#isReplicated()
+        * 
+        * @param pageId
+        * @param sessionId
+        * @return generated key
         */
-       public boolean isReplicated()
+       private static String getKey(final String sessionId, final int pageId)
        {
-               return dataStore.isReplicated();
+               return pageId + ":::" + sessionId;
        }
 
        /**
-        * Rejecting task handler.
-        * <p>
-        * If the queue is full a task is rejected and must be saved 
synchronously in this handler
+        * 
+        * @param entry
+        * @return generated key
         */
-       private static class RejectStoringTask implements 
RejectedExecutionHandler
+       private static String getKey(final Entry entry)
        {
-
-               public void rejectedExecution(final Runnable runnable, final 
ThreadPoolExecutor executor)
-               {
-                       StoreEntryRunnable storeEntryRunnable = 
(StoreEntryRunnable)runnable;
-                       IDataStore dataStore = 
storeEntryRunnable.getDataStore();
-                       Entry entry = storeEntryRunnable.getEntry();
-
-                       log.debug("Queue is full. Entry '{}' will be saved 
synchronously.", entry);
-                       dataStore.storeData(entry.getSessionId(), 
entry.getPageId(), entry.getData());
-               }
-
-       }
-
-       /**
-        * Task implementation to save a page entry in a separate thread
-        */
-       private static class StoreEntryRunnable implements Runnable
-       {
-               private final Entry entry;
-               private final IDataStore dataStore;
-
-               public StoreEntryRunnable(final Entry entry, final IDataStore 
dataStore)
-               {
-                       this.entry = entry;
-                       this.dataStore = dataStore;
-               }
-
-               public void run()
-               {
-                       log.debug("Saving asynchronously: '{}'...", entry);
-                       dataStore.storeData(entry.getSessionId(), 
entry.getPageId(), entry.getData());
-               }
-
-               public Entry getEntry()
-               {
-                       return entry;
-               }
-
-               public IDataStore getDataStore()
-               {
-                       return dataStore;
-               }
+               return getKey(entry.sessionId, entry.pageId);
        }
 
        /**
@@ -285,21 +271,6 @@ public class AsynchronousDataStore imple
                        this.data = Args.notNull(data, "data");
                }
 
-               public String getSessionId()
-               {
-                       return sessionId;
-               }
-
-               public int getPageId()
-               {
-                       return pageId;
-               }
-
-               public byte[] getData()
-               {
-                       return data;
-               }
-
                @Override
                public int hashCode()
                {
@@ -337,5 +308,59 @@ public class AsynchronousDataStore imple
                {
                        return "Entry [sessionId=" + sessionId + ", pageId=" + 
pageId + "]";
                }
+
+       }
+
+       /**
+        * The thread that acts as consumer of {@link Entry}ies
+        */
+       private static class PageSavingRunnable implements Runnable
+       {
+               private static final Logger log = 
LoggerFactory.getLogger(PageSavingRunnable.class);
+
+               private final AtomicBoolean destroy;
+
+               private final BlockingQueue<Entry> entries;
+
+               private final ConcurrentMap<String, Entry> entryMap;
+
+               private final IDataStore dataStore;
+
+               private PageSavingRunnable(IDataStore dataStore, 
BlockingQueue<Entry> entries,
+                       ConcurrentMap<String, Entry> entryMap, AtomicBoolean 
destroy)
+               {
+                       this.dataStore = dataStore;
+                       this.entries = entries;
+                       this.entryMap = entryMap;
+                       this.destroy = destroy;
+               }
+
+               public void run()
+               {
+                       while (destroy.get() == false)
+                       {
+                               Entry entry = null;
+                               try
+                               {
+                                       entry = entries.poll(POLL_WAIT, 
TimeUnit.MILLISECONDS);
+                               }
+                               catch (InterruptedException e)
+                               {
+                                       log.error(e.getMessage(), e);
+                               }
+
+                               if (entry != null)
+                               {
+                                       log.debug("Saving asynchronously: 
{}...", entry);
+                                       dataStore.storeData(entry.sessionId, 
entry.pageId, entry.data);
+                                       entryMap.remove(getKey(entry));
+                               }
+                       }
+
+                       synchronized (destroy)
+                       {
+                               destroy.notify();
+                       }
+               }
        }
-}
+}
\ No newline at end of file

Modified: 
wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
URL: 
http://svn.apache.org/viewvc/wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java?rev=1148226&r1=1148225&r2=1148226&view=diff
==============================================================================
--- 
wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
 (original)
+++ 
wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
 Tue Jul 19 08:30:55 2011
@@ -143,6 +143,8 @@ public class DiskDataStoreTest extends T
 
        private final AtomicInteger saveTime = new AtomicInteger(0);
 
+       private RuntimeException exceptionThrownByThread;
+
        private String randomSessionId()
        {
                List<String> s = new ArrayList<String>(sessionCounter.keySet());
@@ -174,10 +176,35 @@ public class DiskDataStoreTest extends T
 
        private IDataStore dataStore;
 
+       /**
+        * Stores RuntimeException into a field.
+        */
+       private abstract class ExceptionCapturingRunnable implements Runnable
+       {
+               public final void run()
+               {
+                       try
+                       {
+                               doRun();
+                       }
+                       catch (RuntimeException e)
+                       {
+                               exceptionThrownByThread = e;
+                       }
+               }
+
+               /**
+                * Called by {@link #run()}. Thrown RuntimeExceptions are 
stores into a field for later
+                * check.
+                */
+               protected abstract void doRun();
+       }
+
        // Store/Save data in DataStore
-       private class SaveRunnable implements Runnable
+       private class SaveRunnable extends ExceptionCapturingRunnable
        {
-               public void run()
+               @Override
+               protected void doRun()
                {
                        File file;
 
@@ -211,9 +238,10 @@ public class DiskDataStoreTest extends T
        };
 
        // Read data from DataStore
-       private class Read1Runnable implements Runnable
+       private class Read1Runnable extends ExceptionCapturingRunnable
        {
-               public void run()
+               @Override
+               protected void doRun()
                {
                        File file;
                        while ((file = filesToRead1.poll()) != null || 
!saveDone.get())
@@ -245,9 +273,10 @@ public class DiskDataStoreTest extends T
                }
        };
 
-       private class Read2Runnable implements Runnable
+       private class Read2Runnable extends ExceptionCapturingRunnable
        {
-               public void run()
+               @Override
+               protected void doRun()
                {
                        File file;
                        while ((file = filesToRead2.poll()) != null || 
!read1Done.get())
@@ -285,17 +314,17 @@ public class DiskDataStoreTest extends T
 
                for (int i = 0; i < THREAD_COUNT; ++i)
                {
-                       new Thread(new SaveRunnable()).start();
+                       new Thread(new Read1Runnable()).start();
                }
 
                for (int i = 0; i < THREAD_COUNT; ++i)
                {
-                       new Thread(new Read1Runnable()).start();
+                       new Thread(new Read2Runnable()).start();
                }
 
                for (int i = 0; i < THREAD_COUNT; ++i)
                {
-                       new Thread(new Read2Runnable()).start();
+                       new Thread(new SaveRunnable()).start();
                }
 
                while (!(read1Done.get() && read2Done.get() && saveDone.get()))
@@ -310,6 +339,11 @@ public class DiskDataStoreTest extends T
                        }
                }
 
+               if (exceptionThrownByThread != null)
+               {
+                       throw new RuntimeException("One of the worker threads 
failed.", exceptionThrownByThread);
+               }
+
                long duration = System.currentTimeMillis() - start;
 
                log.error("Took: " + duration + " ms");


Reply via email to