Author: knopp
Date: Sat Apr  4 22:21:50 2009
New Revision: 762024

URL: http://svn.apache.org/viewvc?rev=762024&view=rev
Log: (empty)

Added:
    
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
   (with props)
Modified:
    
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java

Added: 
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
URL: 
http://svn.apache.org/viewvc/wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java?rev=762024&view=auto
==============================================================================
--- 
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
 (added)
+++ 
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
 Sat Apr  4 22:21:50 2009
@@ -0,0 +1,203 @@
+package org.apache.wicket.page.persistent;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsynchronousDataStore implements DataStore
+{
+       private final DataStore dataStore;
+       
+       public AsynchronousDataStore(DataStore dataStore)
+       {
+               this.dataStore = dataStore;
+               
+               new Thread(new PageSavingRunnable(), 
"PageSavingThread").start();
+       }
+
+       public void destroy()
+       {
+               destroy.set(true);
+               synchronized (entries)
+               {
+                       entries.notify(); // let the saving thread continue     
+               }               
+               try
+               {
+                       synchronized (destroy)
+                       {
+                               destroy.wait(); 
+                       }                       
+               }
+               catch (InterruptedException e)
+               {
+                       e.printStackTrace();
+               }
+               dataStore.destroy();
+       }
+
+       public byte[] getData(String sessionId, int id)
+       {
+               String key = getKey(id, sessionId);
+               Entry entry = entryMap.get(key);
+               if (entry != null)
+               {
+                       return entry.getData();
+               }
+               else
+               {
+                       return dataStore.getData(sessionId, id);
+               }
+       }
+
+       public boolean isReplicated()
+       {
+               return dataStore.isReplicated();
+       }
+
+       protected int getMaxQueuedEntries()
+       {
+               return 100000;
+       }
+       
+       public void removeData(String sessionId, int id)
+       {               
+               synchronized (WRITE_LOCK)
+               {
+                       String key = getKey(id, sessionId);
+                       Entry entry = entryMap.get(key);
+                       if (entry != null)
+                       {
+                               entryMap.remove(key);
+                               entries.remove(entry);
+                       }
+               }               
+               dataStore.removeData(sessionId, id);
+       }
+
+       public void removeData(String sessionId)
+       {
+               synchronized (WRITE_LOCK)
+               {
+                       for (Iterator<Entry> i = entries.iterator(); 
i.hasNext();)
+                       {
+                               Entry e = i.next();
+                               if (e.getSessionId().equals(sessionId))
+                               {
+                                       i.remove();
+                                       String key = getKey(e.getPageId(), 
e.getSessionId());
+                                       entryMap.remove(key);
+                               }
+                       }
+               }
+               dataStore.removeData(sessionId);
+       }
+
+       public void storeData(String sessionId, int id, byte[] data)
+       {
+               if (entryMap.size() > getMaxQueuedEntries())
+               {
+                       dataStore.storeData(sessionId, id, data);
+               }
+               else
+               {
+                       Entry entry = new Entry(sessionId, id, data);
+                       entryMap.put(getKey(id, sessionId), entry);
+                       entries.add(entry);
+                       synchronized (entries)
+                       {
+                               entries.notify();
+                       }
+               }
+       }
+
+       private Queue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
+       private Map<String, Entry> entryMap = new ConcurrentHashMap<String, 
Entry>();
+       
+       private String getKey(int pageId, String sessionId)
+       {
+               return pageId + "::: " + sessionId;
+       }
+       
+       private static final Object WRITE_LOCK = new Object();
+       
+       private static class Entry 
+       {                       
+               private final String sessionId;
+               private final int pageId;
+               private final byte data[];
+               
+               public Entry(String sessionId, int pageId, byte data[])
+               {
+                       this.sessionId = sessionId;
+                       this.pageId = pageId;
+                       this.data = data;
+               }
+               
+               public String getSessionId()
+               {
+                       return sessionId;
+               }
+               
+               public int getPageId()
+               {
+                       return pageId;
+               }
+               
+               public byte[] getData()
+               {
+                       return data;
+               }                               
+       }
+       
+       private AtomicBoolean destroy = new AtomicBoolean(false);
+       
+       private class PageSavingRunnable implements Runnable
+       {
+               public void run()
+               {
+                       while(destroy.get() == false || !entries.isEmpty())
+                       {
+                               if (entries.isEmpty())
+                               {
+                                       try
+                                       {
+                                               synchronized (entries)
+                                               {
+                                                       entries.wait(); 
+                                               }                               
                
+                                       }
+                                       catch (InterruptedException e)
+                                       {
+                                               e.printStackTrace();
+                                       }
+                               }
+                               synchronized (WRITE_LOCK)
+                               {
+                                       Entry entry = entries.poll();
+                                       if (entry != null)
+                                       {
+                                               
dataStore.storeData(entry.getSessionId(), entry.getPageId(), entry.getData());
+                                               String key = 
getKey(entry.getPageId(), entry.getSessionId());
+                                               entryMap.remove(key);
+                                       }
+                               }                               
+                       }
+                       try
+                       {
+                               Thread.sleep(10);
+                       }
+                       catch (InterruptedException e)
+                       {
+                               e.printStackTrace();
+                       }
+                       synchronized (destroy)
+                       {
+                               destroy.notify();       
+                       }                       
+               }
+       };
+}

Propchange: 
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
URL: 
http://svn.apache.org/viewvc/wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java?rev=762024&r1=762023&r2=762024&view=diff
==============================================================================
--- 
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
 (original)
+++ 
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
 Sat Apr  4 22:21:50 2009
@@ -12,6 +12,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.wicket.page.persistent.AsynchronousDataStore;
 import org.apache.wicket.page.persistent.DataStore;
 import org.apache.wicket.util.lang.Check;
 
@@ -28,7 +29,7 @@
        private static final int FILE_SIZE_MAX = 1024 * 30;
        private static final int MAX_SIZE_PER_SESSION = 1000000;
        private static final int FILE_CHANNEL_POOL_CAPACITY = 100;
-       private static final int SESSION_COUNT = 100;
+       private static final int SESSION_COUNT = 50;
        private static final int FILES_COUNT = 1000;
        private static final int SLEEP_MAX = 10;
        private static final int THREAD_COUNT = 20;
@@ -106,6 +107,8 @@
        private AtomicInteger bytesWritten = new AtomicInteger(0);
        private AtomicInteger bytesRead = new AtomicInteger(0);
        
+       private AtomicInteger saveTime = new AtomicInteger(0);
+       
        private String randomSessionId()
        {
                List<String> s = new ArrayList<String>(sessionCounter.keySet());
@@ -128,7 +131,10 @@
                {
                        String session = randomSessionId();
                        File file = new File(session, nextSessionId(session));
+                       long now = System.nanoTime();
                        filesToSave.add(file);
+                       long duration = System.nanoTime() - now;
+                       saveTime.addAndGet((int) duration);
                }
                
        }
@@ -273,6 +279,8 @@
                System.out.println("Took: " + duration + " ms");
                System.out.println("Save: " + saveCount.intValue() + " files, " 
+ bytesWritten.get() +" bytes");
                System.out.println("Read: " + (read1Count.get() + 
read2Count.get()) + " files, " + bytesRead.get() + " bytes");
+
+               System.out.println("Average save time (ns): " + (double) 
saveTime.get() / (double) saveCount.get());
                
                assertEquals(0, failures.get());
                
@@ -287,6 +295,7 @@
                generateFiles();                
                
                dataStore = new DiskDataStore("app1", MAX_SIZE_PER_SESSION, 
FILE_CHANNEL_POOL_CAPACITY);
+               dataStore = new AsynchronousDataStore(dataStore);
                
                doTestDataStore();
                


Reply via email to