Author: knopp
Date: Sat Apr 4 22:21:50 2009
New Revision: 762024
URL: http://svn.apache.org/viewvc?rev=762024&view=rev
Log: (empty)
Added:
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
(with props)
Modified:
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
Added:
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
URL:
http://svn.apache.org/viewvc/wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java?rev=762024&view=auto
==============================================================================
---
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
(added)
+++
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
Sat Apr 4 22:21:50 2009
@@ -0,0 +1,203 @@
+package org.apache.wicket.page.persistent;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsynchronousDataStore implements DataStore
+{
+ private final DataStore dataStore;
+
+ public AsynchronousDataStore(DataStore dataStore)
+ {
+ this.dataStore = dataStore;
+
+ new Thread(new PageSavingRunnable(),
"PageSavingThread").start();
+ }
+
+ public void destroy()
+ {
+ destroy.set(true);
+ synchronized (entries)
+ {
+ entries.notify(); // let the saving thread continue
+ }
+ try
+ {
+ synchronized (destroy)
+ {
+ destroy.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ dataStore.destroy();
+ }
+
+ public byte[] getData(String sessionId, int id)
+ {
+ String key = getKey(id, sessionId);
+ Entry entry = entryMap.get(key);
+ if (entry != null)
+ {
+ return entry.getData();
+ }
+ else
+ {
+ return dataStore.getData(sessionId, id);
+ }
+ }
+
+ public boolean isReplicated()
+ {
+ return dataStore.isReplicated();
+ }
+
+ protected int getMaxQueuedEntries()
+ {
+ return 100000;
+ }
+
+ public void removeData(String sessionId, int id)
+ {
+ synchronized (WRITE_LOCK)
+ {
+ String key = getKey(id, sessionId);
+ Entry entry = entryMap.get(key);
+ if (entry != null)
+ {
+ entryMap.remove(key);
+ entries.remove(entry);
+ }
+ }
+ dataStore.removeData(sessionId, id);
+ }
+
+ public void removeData(String sessionId)
+ {
+ synchronized (WRITE_LOCK)
+ {
+ for (Iterator<Entry> i = entries.iterator();
i.hasNext();)
+ {
+ Entry e = i.next();
+ if (e.getSessionId().equals(sessionId))
+ {
+ i.remove();
+ String key = getKey(e.getPageId(),
e.getSessionId());
+ entryMap.remove(key);
+ }
+ }
+ }
+ dataStore.removeData(sessionId);
+ }
+
+ public void storeData(String sessionId, int id, byte[] data)
+ {
+ if (entryMap.size() > getMaxQueuedEntries())
+ {
+ dataStore.storeData(sessionId, id, data);
+ }
+ else
+ {
+ Entry entry = new Entry(sessionId, id, data);
+ entryMap.put(getKey(id, sessionId), entry);
+ entries.add(entry);
+ synchronized (entries)
+ {
+ entries.notify();
+ }
+ }
+ }
+
+ private Queue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
+ private Map<String, Entry> entryMap = new ConcurrentHashMap<String,
Entry>();
+
+ private String getKey(int pageId, String sessionId)
+ {
+ return pageId + "::: " + sessionId;
+ }
+
+ private static final Object WRITE_LOCK = new Object();
+
+ private static class Entry
+ {
+ private final String sessionId;
+ private final int pageId;
+ private final byte data[];
+
+ public Entry(String sessionId, int pageId, byte data[])
+ {
+ this.sessionId = sessionId;
+ this.pageId = pageId;
+ this.data = data;
+ }
+
+ public String getSessionId()
+ {
+ return sessionId;
+ }
+
+ public int getPageId()
+ {
+ return pageId;
+ }
+
+ public byte[] getData()
+ {
+ return data;
+ }
+ }
+
+ private AtomicBoolean destroy = new AtomicBoolean(false);
+
+ private class PageSavingRunnable implements Runnable
+ {
+ public void run()
+ {
+ while(destroy.get() == false || !entries.isEmpty())
+ {
+ if (entries.isEmpty())
+ {
+ try
+ {
+ synchronized (entries)
+ {
+ entries.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ synchronized (WRITE_LOCK)
+ {
+ Entry entry = entries.poll();
+ if (entry != null)
+ {
+
dataStore.storeData(entry.getSessionId(), entry.getPageId(), entry.getData());
+ String key =
getKey(entry.getPageId(), entry.getSessionId());
+ entryMap.remove(key);
+ }
+ }
+ }
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ synchronized (destroy)
+ {
+ destroy.notify();
+ }
+ }
+ };
+}
Propchange:
wicket/sandbox/knopp/experimental/wicket-ng/src/main/java/org/apache/wicket/page/persistent/AsynchronousDataStore.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
URL:
http://svn.apache.org/viewvc/wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java?rev=762024&r1=762023&r2=762024&view=diff
==============================================================================
---
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
(original)
+++
wicket/sandbox/knopp/experimental/wicket-ng/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
Sat Apr 4 22:21:50 2009
@@ -12,6 +12,7 @@
import junit.framework.TestCase;
+import org.apache.wicket.page.persistent.AsynchronousDataStore;
import org.apache.wicket.page.persistent.DataStore;
import org.apache.wicket.util.lang.Check;
@@ -28,7 +29,7 @@
private static final int FILE_SIZE_MAX = 1024 * 30;
private static final int MAX_SIZE_PER_SESSION = 1000000;
private static final int FILE_CHANNEL_POOL_CAPACITY = 100;
- private static final int SESSION_COUNT = 100;
+ private static final int SESSION_COUNT = 50;
private static final int FILES_COUNT = 1000;
private static final int SLEEP_MAX = 10;
private static final int THREAD_COUNT = 20;
@@ -106,6 +107,8 @@
private AtomicInteger bytesWritten = new AtomicInteger(0);
private AtomicInteger bytesRead = new AtomicInteger(0);
+ private AtomicInteger saveTime = new AtomicInteger(0);
+
private String randomSessionId()
{
List<String> s = new ArrayList<String>(sessionCounter.keySet());
@@ -128,7 +131,10 @@
{
String session = randomSessionId();
File file = new File(session, nextSessionId(session));
+ long now = System.nanoTime();
filesToSave.add(file);
+ long duration = System.nanoTime() - now;
+ saveTime.addAndGet((int) duration);
}
}
@@ -273,6 +279,8 @@
System.out.println("Took: " + duration + " ms");
System.out.println("Save: " + saveCount.intValue() + " files, "
+ bytesWritten.get() +" bytes");
System.out.println("Read: " + (read1Count.get() +
read2Count.get()) + " files, " + bytesRead.get() + " bytes");
+
+ System.out.println("Average save time (ns): " + (double)
saveTime.get() / (double) saveCount.get());
assertEquals(0, failures.get());
@@ -287,6 +295,7 @@
generateFiles();
dataStore = new DiskDataStore("app1", MAX_SIZE_PER_SESSION,
FILE_CHANNEL_POOL_CAPACITY);
+ dataStore = new AsynchronousDataStore(dataStore);
doTestDataStore();