adjust format & doc
Project: http://git-wip-us.apache.org/repos/asf/wicket/repo Commit: http://git-wip-us.apache.org/repos/asf/wicket/commit/0e975d72 Tree: http://git-wip-us.apache.org/repos/asf/wicket/tree/0e975d72 Diff: http://git-wip-us.apache.org/repos/asf/wicket/diff/0e975d72 Branch: refs/heads/master Commit: 0e975d729c9dd85dc3438601a6901893bf0df121 Parents: 2ffd587 Author: Manuel Barzi <[email protected]> Authored: Wed Mar 15 16:26:10 2017 +0100 Committer: Manuel Barzi <[email protected]> Committed: Wed Mar 15 16:26:10 2017 +0100 ---------------------------------------------------------------------- .../apache/wicket/pageStore/AsyncPageStore.java | 373 -------------- .../wicket/pageStore/AsynchronousPageStore.java | 358 +++++++++++++ .../wicket/pageStore/AsyncPageStoreTest.java | 497 ------------------ .../pageStore/AsynchronousPageStoreTest.java | 498 +++++++++++++++++++ 4 files changed, 856 insertions(+), 870 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/wicket/blob/0e975d72/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java deleted file mode 100644 index 213164d..0000000 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java +++ /dev/null @@ -1,373 +0,0 @@ -package org.apache.wicket.pageStore; - -import java.io.Serializable; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.wicket.page.IManageablePage; -import org.apache.wicket.util.lang.Args; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Facade for {@link IPageStore} that does the actual saving in worker thread. - * <p> - * Creates an {@link Entry} for each double (sessionId, page) and puts it in {@link #entries} queue - * if there is room. Acts as producer.<br/> - * Later {@link PageSavingRunnable} reads in blocking manner from {@link #entries} and saves each - * entry. Acts as consumer. - * </p> - * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page - * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore}, - * though it may happen in the extreme case when the queue is full. These cases should be avoided. - * - * Based on AsynchronousDataStore (@author Matej Knopp). - * - * @author manuelbarzi - */ -public class AsyncPageStore implements IPageStore -{ - - /** Log for reporting. */ - private static final Logger log = LoggerFactory.getLogger(AsyncPageStore.class); - - /** - * The time to wait when adding an {@link Entry} into the entries. In millis. - */ - private static final long OFFER_WAIT = 30L; - - /** - * The time to wait for an entry to save with the wrapped {@link IPageStore} . In millis. - */ - private static final long POLL_WAIT = 1000L; - - /** - * The page saving thread. - */ - private final Thread pageSavingThread; - - /** - * The wrapped {@link IPageStore} that actually stores that pages - */ - private final IPageStore pageStore; - - /** - * The queue where the entries which have to be saved are temporary stored - */ - private final BlockingQueue<Entry> entries; - - /** - * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval of {@link Entry}s which - * are not yet stored by the wrapped {@link IPageStore} - */ - private final ConcurrentMap<String, Entry> entryMap; - - /** - * Construct. - * - * @param pageStore - * the wrapped {@link IPageStore} that actually saved the page - * @param capacity - * the capacity of the queue that delays the saving - */ - public AsyncPageStore(final IPageStore pageStore, final int capacity) - { - this.pageStore = pageStore; - entries = new LinkedBlockingQueue<Entry>(capacity); - entryMap = new ConcurrentHashMap<String, Entry>(); - - PageSavingRunnable savingRunnable = new PageSavingRunnable(pageStore, entries, entryMap); - pageSavingThread = new Thread(savingRunnable, "AsyncPageStore-PageSavingThread"); - pageSavingThread.setDaemon(true); - pageSavingThread.start(); - } - - /** - * Little helper - * - * @param sessionId - * @param pageId - * @return Entry - */ - private Entry getEntry(final String sessionId, final int pageId) - { - return entryMap.get(getKey(sessionId, pageId)); - } - - /** - * - * @param pageId - * @param sessionId - * @return generated key - */ - private static String getKey(final String sessionId, final int pageId) - { - return pageId + ":::" + sessionId; - } - - /** - * - * @param entry - * @return generated key - */ - private static String getKey(final Entry entry) - { - return getKey(entry.sessionId, entry.page.getPageId()); - } - - /** - * The structure used for an entry in the queue - */ - private static class Entry - { - private final String sessionId; - private final IManageablePage page; - - public Entry(final String sessionId, final IManageablePage page) - { - this.sessionId = Args.notNull(sessionId, "sessionId"); - this.page = Args.notNull(page, "page"); - } - - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = prime * result + page.getPageId(); - result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Entry other = (Entry)obj; - if (page.getPageId() != other.page.getPageId()) - return false; - if (sessionId == null) - { - if (other.sessionId != null) - return false; - } - else if (!sessionId.equals(other.sessionId)) - return false; - return true; - } - - @Override - public String toString() - { - return "Entry [sessionId=" + sessionId + ", pageId=" + page.getPageId() + "]"; - } - - } - - /** - * The thread that acts as consumer of {@link Entry}ies - */ - private static class PageSavingRunnable implements Runnable - { - private static final Logger log = LoggerFactory.getLogger(PageSavingRunnable.class); - - private final BlockingQueue<Entry> entries; - - private final ConcurrentMap<String, Entry> entryMap; - - private final IPageStore pageStore; - - private PageSavingRunnable(IPageStore pageStore, BlockingQueue<Entry> entries, - ConcurrentMap<String, Entry> entryMap) - { - this.pageStore = pageStore; - this.entries = entries; - this.entryMap = entryMap; - } - - @Override - public void run() - { - while (!Thread.interrupted()) - { - Entry entry = null; - try - { - entry = entries.poll(POLL_WAIT, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - - if (entry != null) - { - log.debug("Saving asynchronously: {}...", entry); - pageStore.storePage(entry.sessionId, entry.page); - entryMap.remove(getKey(entry)); - } - } - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#destroy() - */ - @Override - public void destroy() - { - if (pageSavingThread.isAlive()) - { - pageSavingThread.interrupt(); - try - { - pageSavingThread.join(); - } - catch (InterruptedException e) - { - log.error(e.getMessage(), e); - } - } - - pageStore.destroy(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#getPage(java.lang.String, int) - */ - @Override - public IManageablePage getPage(String sessionId, int pageId) - { - Entry entry = getEntry(sessionId, pageId); - if (entry != null) - { - log.debug( - "Returning the page of a non-stored entry with session id '{}' and page id '{}'", - sessionId, pageId); - return entry.page; - } - IManageablePage page = pageStore.getPage(sessionId, pageId); - - log.debug("Returning the page of a stored entry with session id '{}' and page id '{}'", - sessionId, pageId); - - return page; - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#removePage(java.lang.String, int) - */ - @Override - public void removePage(String sessionId, int pageId) - { - String key = getKey(sessionId, pageId); - if (key != null) - { - Entry entry = entryMap.remove(key); - if (entry != null) - { - entries.remove(entry); - } - } - - pageStore.removePage(sessionId, pageId); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#storePage(java.lang.String, - * org.apache.wicket.page.IManageablePage) - */ - @Override - public void storePage(String sessionId, IManageablePage page) - { - Entry entry = new Entry(sessionId, page); - String key = getKey(entry); - entryMap.put(key, entry); - - try - { - if (entries.offer(entry, OFFER_WAIT, TimeUnit.MILLISECONDS)) - { - log.debug("Offered for storing asynchronously page with id '{}' in session '{}'", - page.getPageId(), sessionId); - } - else - { - log.debug("Storing synchronously page with id '{}' in session '{}'", - page.getPageId(), sessionId); - entryMap.remove(key); - pageStore.storePage(sessionId, page); - } - } - catch (InterruptedException e) - { - log.error(e.getMessage(), e); - entryMap.remove(key); - pageStore.storePage(sessionId, page); - } - - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#unbind(java.lang.String) - */ - @Override - public void unbind(String sessionId) - { - pageStore.unbind(sessionId); - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#prepareForSerialization(java.lang. String, - * java.io.Serializable) - */ - @Override - public Serializable prepareForSerialization(String sessionId, Serializable page) - { - return pageStore.prepareForSerialization(sessionId, page); - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#restoreAfterSerialization(java.io. Serializable) - */ - @Override - public Object restoreAfterSerialization(Serializable serializable) - { - return pageStore.restoreAfterSerialization(serializable); - } - - /* - * (non-Javadoc) - * - * @see org.apache.wicket.pageStore.IPageStore#convertToPage(java.lang.Object) - */ - @Override - public IManageablePage convertToPage(Object page) - { - return pageStore.convertToPage(page); - } - -} http://git-wip-us.apache.org/repos/asf/wicket/blob/0e975d72/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java new file mode 100644 index 0000000..087257c --- /dev/null +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java @@ -0,0 +1,358 @@ +package org.apache.wicket.pageStore; + +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.wicket.page.IManageablePage; +import org.apache.wicket.pageStore.IPageStore; +import org.apache.wicket.util.lang.Args; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Facade for {@link IPageStore} that does the actual saving in worker thread. + * <p> + * Creates an {@link Entry} for each double (sessionId, page) and puts it in {@link #entries} queue + * if there is room. Acts as producer.<br/> + * Later {@link PageSavingRunnable} reads in blocking manner from {@link #entries} and saves each + * entry. Acts as consumer. + * </p> + * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page + * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore}, + * though it may happen in the extreme case when the queue is full. These cases should be avoided. + * + * Based on AsynchronousDataStore (@author Matej Knopp). + * + * @author manuelbarzi + */ +public class AsynchronousPageStore implements IPageStore +{ + + /** Log for reporting. */ + private static final Logger log = LoggerFactory.getLogger(AsynchronousPageStore.class); + + /** + * The time to wait when adding an {@link Entry} into the entries. In millis. + */ + private static final long OFFER_WAIT = 30L; + + /** + * The time to wait for an entry to save with the wrapped {@link IPageStore} . In millis. + */ + private static final long POLL_WAIT = 1000L; + + /** + * The page saving thread. + */ + private final Thread pageSavingThread; + + /** + * The wrapped {@link IPageStore} that actually stores that pages + */ + private final IPageStore pageStore; + + /** + * The queue where the entries which have to be saved are temporary stored + */ + private final BlockingQueue<Entry> entries; + + /** + * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval of {@link Entry}s which + * are not yet stored by the wrapped {@link IPageStore} + */ + private final ConcurrentMap<String, Entry> entryMap; + + /** + * Construct. + * + * @param pageStore + * the wrapped {@link IPageStore} that actually saved the page + * @param capacity + * the capacity of the queue that delays the saving + */ + public AsynchronousPageStore(final IPageStore pageStore, final int capacity) + { + this.pageStore = pageStore; + entries = new LinkedBlockingQueue<Entry>(capacity); + entryMap = new ConcurrentHashMap<String, Entry>(); + + PageSavingRunnable savingRunnable = new PageSavingRunnable(pageStore, entries, entryMap); + pageSavingThread = new Thread(savingRunnable, "AsyncPageStore-PageSavingThread"); + pageSavingThread.setDaemon(true); + pageSavingThread.start(); + } + + /** + * Little helper + * + * @param sessionId + * @param pageId + * @return Entry + */ + private Entry getEntry(final String sessionId, final int pageId) + { + return entryMap.get(getKey(sessionId, pageId)); + } + + /** + * + * @param pageId + * @param sessionId + * @return generated key + */ + private static String getKey(final String sessionId, final int pageId) + { + return pageId + ":::" + sessionId; + } + + /** + * + * @param entry + * @return generated key + */ + private static String getKey(final Entry entry) + { + return getKey(entry.sessionId, entry.page.getPageId()); + } + + /** + * The structure used for an entry in the queue + */ + private static class Entry + { + private final String sessionId; + private final IManageablePage page; + + public Entry(final String sessionId, final IManageablePage page) + { + this.sessionId = Args.notNull(sessionId, "sessionId"); + this.page = Args.notNull(page, "page"); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + page.getPageId(); + result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Entry other = (Entry)obj; + if (page.getPageId() != other.page.getPageId()) + return false; + if (sessionId == null) + { + if (other.sessionId != null) + return false; + } + else if (!sessionId.equals(other.sessionId)) + return false; + return true; + } + + @Override + public String toString() + { + return "Entry [sessionId=" + sessionId + ", pageId=" + page.getPageId() + "]"; + } + + } + + /** + * The thread that acts as consumer of {@link Entry}ies + */ + private static class PageSavingRunnable implements Runnable + { + private static final Logger log = LoggerFactory.getLogger(PageSavingRunnable.class); + + private final BlockingQueue<Entry> entries; + + private final ConcurrentMap<String, Entry> entryMap; + + private final IPageStore pageStore; + + private PageSavingRunnable(IPageStore pageStore, BlockingQueue<Entry> entries, + ConcurrentMap<String, Entry> entryMap) + { + this.pageStore = pageStore; + this.entries = entries; + this.entryMap = entryMap; + } + + @Override + public void run() + { + while (!Thread.interrupted()) + { + Entry entry = null; + try + { + entry = entries.poll(POLL_WAIT, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + + if (entry != null) + { + log.debug("Saving asynchronously: {}...", entry); + pageStore.storePage(entry.sessionId, entry.page); + entryMap.remove(getKey(entry)); + } + } + } + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#destroy() + */ + @Override + public void destroy() + { + if (pageSavingThread.isAlive()) + { + pageSavingThread.interrupt(); + try + { + pageSavingThread.join(); + } + catch (InterruptedException e) + { + log.error(e.getMessage(), e); + } + } + + pageStore.destroy(); + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#getPage(java.lang.String, int) + */ + @Override + public IManageablePage getPage(String sessionId, int pageId) + { + Entry entry = getEntry(sessionId, pageId); + if (entry != null) + { + log.debug( + "Returning the page of a non-stored entry with session id '{}' and page id '{}'", + sessionId, pageId); + return entry.page; + } + IManageablePage page = pageStore.getPage(sessionId, pageId); + + log.debug("Returning the page of a stored entry with session id '{}' and page id '{}'", + sessionId, pageId); + + return page; + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#removePage(java.lang.String, int) + */ + @Override + public void removePage(String sessionId, int pageId) + { + String key = getKey(sessionId, pageId); + if (key != null) + { + Entry entry = entryMap.remove(key); + if (entry != null) + { + entries.remove(entry); + } + } + + pageStore.removePage(sessionId, pageId); + + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#storePage(java.lang.String, + * org.apache.wicket.page.IManageablePage) + */ + @Override + public void storePage(String sessionId, IManageablePage page) + { + Entry entry = new Entry(sessionId, page); + String key = getKey(entry); + entryMap.put(key, entry); + + try + { + if (entries.offer(entry, OFFER_WAIT, TimeUnit.MILLISECONDS)) + { + log.debug("Offered for storing asynchronously page with id '{}' in session '{}'", + page.getPageId(), sessionId); + } + else + { + log.debug("Storing synchronously page with id '{}' in session '{}'", + page.getPageId(), sessionId); + entryMap.remove(key); + pageStore.storePage(sessionId, page); + } + } + catch (InterruptedException e) + { + log.error(e.getMessage(), e); + entryMap.remove(key); + pageStore.storePage(sessionId, page); + } + + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#unbind(java.lang.String) + */ + @Override + public void unbind(String sessionId) + { + pageStore.unbind(sessionId); + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#prepareForSerialization(java.lang. String, + * java.io.Serializable) + */ + @Override + public Serializable prepareForSerialization(String sessionId, Serializable page) + { + return pageStore.prepareForSerialization(sessionId, page); + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#restoreAfterSerialization(java.io. Serializable) + */ + @Override + public Object restoreAfterSerialization(Serializable serializable) + { + return pageStore.restoreAfterSerialization(serializable); + } + + /** + * @see org.apache.wicket.pageStore.IPageStore#convertToPage(java.lang.Object) + */ + @Override + public IManageablePage convertToPage(Object page) + { + return pageStore.convertToPage(page); + } + +} http://git-wip-us.apache.org/repos/asf/wicket/blob/0e975d72/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java b/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java deleted file mode 100644 index 9849471..0000000 --- a/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java +++ /dev/null @@ -1,497 +0,0 @@ -package org.apache.wicket.pageStore; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.lang3.RandomUtils; -import org.apache.wicket.page.IManageablePage; -import org.apache.wicket.serialize.ISerializer; -import org.apache.wicket.serialize.java.DeflatedJavaSerializer; -import org.apache.wicket.util.file.File; -import org.apache.wicket.util.lang.Bytes; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Stopwatch; - -/** - * AsyncPageStoreTest - * - * @author manuelbarzi - * - */ -public class AsyncPageStoreTest -{ - - /** Log for reporting. */ - private static final Logger log = LoggerFactory.getLogger(AsyncPageStoreTest.class); - - @SuppressWarnings("serial") - private static class DummyPage implements IManageablePage - { - - private int pageId; - private long writeMillis; - private long readMillis; - private String sessionId; - - private DummyPage(int pageId, long writeMillis, long readMillis, String sessionId) - { - this.pageId = pageId; - this.writeMillis = writeMillis; - this.readMillis = readMillis; - this.sessionId = sessionId; - } - - @Override - public boolean isPageStateless() - { - return false; - } - - @Override - public int getPageId() - { - return pageId; - } - - @Override - public void detach() - { - } - - @Override - public boolean setFreezePageId(boolean freeze) - { - return false; - } - - /** - * @param s - * @throws IOException - */ - private void writeObject(java.io.ObjectOutputStream s) throws IOException - { - log.debug("serializing page {} for {}ms (session {})", getPageId(), writeMillis, - sessionId); - try - { - Thread.sleep(writeMillis); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - - s.writeInt(pageId); - s.writeLong(writeMillis); - s.writeLong(readMillis); - s.writeObject(sessionId); - } - - private void readObject(java.io.ObjectInputStream s) - throws IOException, ClassNotFoundException - { - log.debug("deserializing page {} for {}ms (session {})", getPageId(), writeMillis, - sessionId); - try - { - Thread.sleep(readMillis); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - - pageId = s.readInt(); - writeMillis = s.readLong(); - readMillis = s.readLong(); - sessionId = (String)s.readObject(); - } - - public String toString() - { - return "DummyPage[pageId = " + pageId + ", writeMillis = " + writeMillis + - ", readMillis = " + readMillis + ", sessionId = " + sessionId + ", hashCode = " + - hashCode() + "]"; - } - } - - /** - * Store works fully async when number of pages handled never exceeds the async-storage - * capacity. - * - * @throws InterruptedException - */ - @Test - public void storeBehavesAsyncWhenNotExceedingStoreCapacity() throws InterruptedException - { - int sessions = 2; - int pages = 5; - long writeMillis = 2000; - long readMillis = 1500; - int asyncPageStoreCapacity = pages * sessions; - - List<Metrics> results = runTest(sessions, pages, writeMillis, readMillis, - asyncPageStoreCapacity); - - for (Metrics metrics : results) - System.out.println(metrics); - - for (Metrics metrics : results) - { - assertEquals(metrics.storedPage, metrics.restoredPage); - assertTrue(metrics.storingMillis < writeMillis); - assertTrue(metrics.restoringMillis < readMillis); - } - } - - /** - * Store behaves sync from when number of pages handled exceeds the given async-storage - * capacity. It works async until the number of pages reaches the limit (capacity). - * - * @throws InterruptedException - */ - @Test - public void storeBehavesSyncFromWhenExceedingStoreCapacity() throws InterruptedException - { - int sessions = 2; - int pages = 5; - long writeMillis = 2000; - long readMillis = 1500; - int asyncPageStoreCapacity = pages; // only up to the first round of - // pages - - List<Metrics> results = runTest(sessions, pages, writeMillis, readMillis, - asyncPageStoreCapacity); - - for (Metrics metrics : results) - System.out.println(metrics); - - int sync = 0; - - for (int i = 0; i < results.size(); i++) - { - Metrics metrics = results.get(i); - - assertEquals(metrics.storedPage.sessionId, metrics.restoredPage.sessionId); - assertEquals(metrics.storedPage.getPageId(), metrics.restoredPage.getPageId()); - - if (!metrics.storedPage.equals(metrics.restoredPage)) - { - assertTrue(metrics.storingMillis >= metrics.storedPage.writeMillis); - sync++; - } - } - - assertTrue(sync > 0); - } - - // test run - - private class Metrics - { - private DummyPage storedPage; - private long storingMillis; - private DummyPage restoredPage; - private long restoringMillis; - - public String toString() - { - return "Metrics[storedPage = " + storedPage + ", storingMillis = " + storingMillis + - ", restoredPage = " + restoredPage + ", restoringMillis = " + restoringMillis + "]"; - } - } - - private List<Metrics> runTest(int sessions, int pages, long writeMillis, long readMillis, - int asyncPageStoreCapacity) throws InterruptedException - { - - List<Metrics> results = new ArrayList<>(); - - final CountDownLatch lock = new CountDownLatch(pages * sessions); - - ISerializer serializer = new DeflatedJavaSerializer("applicationKey"); - // ISerializer serializer = new DummySerializer(); - IDataStore dataStore = new DiskDataStore("applicationName", new File("./target"), - Bytes.bytes(10000l)); - IPageStore pageStore = new DefaultPageStore(serializer, dataStore, 0) - { - // IPageStore pageStore = new DummyPageStore(new - // File("target/store")) { - - @Override - public void storePage(String sessionId, IManageablePage page) - { - - super.storePage(sessionId, page); - - lock.countDown(); - } - }; - - IPageStore asyncPageStore = new AsyncPageStore(pageStore, asyncPageStoreCapacity); - - Stopwatch stopwatch = Stopwatch.createUnstarted(); - - for (int pageId = 1; pageId <= pages; pageId++) - { - for (int sessionId = 1; sessionId <= sessions; sessionId++) - { - String session = String.valueOf(sessionId); - Metrics metrics = new Metrics(); - - stopwatch.reset(); - DummyPage page = new DummyPage(pageId, around(writeMillis), around(readMillis), - session); - stopwatch.start(); - asyncPageStore.storePage(session, page); - metrics.storedPage = page; - metrics.storingMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); - - stopwatch.reset(); - stopwatch.start(); - metrics.restoredPage = DummyPage.class - .cast(asyncPageStore.getPage(session, pageId)); - metrics.restoringMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); - - results.add(metrics); - } - } - - lock.await(pages * sessions * (writeMillis + readMillis), TimeUnit.MILLISECONDS); - - return results; - } - - private long around(long target) - { - return RandomUtils.nextLong((long)(target * .9), (long)(target * 1.1)); - } - - // other aux dummy impls for testing - - private class DummySerializer implements ISerializer - { - - @Override - public byte[] serialize(Object obj) - { - ByteArrayOutputStream bos = null; - ObjectOutput out = null; - try - { - bos = new ByteArrayOutputStream(); - out = new ObjectOutputStream(bos); - out.writeObject(obj); - return bos.toByteArray(); - } - catch (FileNotFoundException e) - { - throw new RuntimeException(e); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - finally - { - try - { - if (bos != null) - bos.close(); - if (out != null) - out.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - @Override - public Object deserialize(byte[] bytes) - { - ByteArrayInputStream bis = null; - ObjectInput in = null; - try - { - bis = new ByteArrayInputStream(bytes); - in = new ObjectInputStream(bis); - return in.readObject(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - catch (ClassNotFoundException e) - { - throw new RuntimeException(e); - } - finally - { - try - { - if (bis != null) - bis.close(); - if (in != null) - in.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - } - - private class DummyPageStore implements IPageStore - { - - private File folder; - - private DummyPageStore(File folder) - { - folder.mkdirs(); - this.folder = folder; - } - - private File getPageFile(String sessionId, int pageId) - { - return new File(folder.getAbsolutePath() + "/" + sessionId + "-" + pageId + ".page"); - } - - private void toFile(Object obj, File file) - { - FileOutputStream fos = null; - ObjectOutput oo = null; - try - { - fos = new FileOutputStream(file); - oo = new ObjectOutputStream(fos); - oo.writeObject(obj); - } - catch (FileNotFoundException e) - { - throw new RuntimeException(e); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - finally - { - try - { - if (fos != null) - fos.close(); - if (oo != null) - oo.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - private Object fromFile(File file) - { - FileInputStream fis = null; - ObjectInput oi = null; - try - { - fis = new FileInputStream(file); - oi = new ObjectInputStream(fis); - return oi.readObject(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - catch (ClassNotFoundException e) - { - throw new RuntimeException(e); - } - finally - { - try - { - if (fis != null) - fis.close(); - if (oi != null) - oi.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - - @Override - public void destroy() - { - } - - @Override - public IManageablePage getPage(String sessionId, int pageId) - { - return (IManageablePage)fromFile(getPageFile(sessionId, pageId)); - } - - @Override - public void removePage(String sessionId, int pageId) - { - } - - @Override - public void storePage(String sessionId, IManageablePage page) - { - toFile(page, getPageFile(sessionId, page.getPageId())); - } - - @Override - public void unbind(String sessionId) - { - } - - @Override - public Serializable prepareForSerialization(String sessionId, Serializable page) - { - return null; - } - - @Override - public Object restoreAfterSerialization(Serializable serializable) - { - return null; - } - - @Override - public IManageablePage convertToPage(Object page) - { - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/wicket/blob/0e975d72/wicket-core/src/test/java/org/apache/wicket/pageStore/AsynchronousPageStoreTest.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/test/java/org/apache/wicket/pageStore/AsynchronousPageStoreTest.java b/wicket-core/src/test/java/org/apache/wicket/pageStore/AsynchronousPageStoreTest.java new file mode 100644 index 0000000..53da7d6 --- /dev/null +++ b/wicket-core/src/test/java/org/apache/wicket/pageStore/AsynchronousPageStoreTest.java @@ -0,0 +1,498 @@ +package org.apache.wicket.pageStore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.wicket.page.IManageablePage; +import org.apache.wicket.serialize.ISerializer; +import org.apache.wicket.serialize.java.DeflatedJavaSerializer; +import org.apache.wicket.util.file.File; +import org.apache.wicket.util.lang.Bytes; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; + +/** + * AsynchronousPageStoreTest + * + * @author manuelbarzi + * + */ +public class AsynchronousPageStoreTest +{ + + /** Log for reporting. */ + private static final Logger log = LoggerFactory.getLogger(AsynchronousPageStoreTest.class); + + @SuppressWarnings("serial") + private static class DummyPage implements IManageablePage + { + + private int pageId; + private long writeMillis; + private long readMillis; + private String sessionId; + + private DummyPage(int pageId, long writeMillis, long readMillis, String sessionId) + { + this.pageId = pageId; + this.writeMillis = writeMillis; + this.readMillis = readMillis; + this.sessionId = sessionId; + } + + @Override + public boolean isPageStateless() + { + return false; + } + + @Override + public int getPageId() + { + return pageId; + } + + @Override + public void detach() + { + } + + @Override + public boolean setFreezePageId(boolean freeze) + { + return false; + } + + /** + * @param s + * @throws IOException + */ + private void writeObject(java.io.ObjectOutputStream s) throws IOException + { + log.debug("serializing page {} for {}ms (session {})", getPageId(), writeMillis, + sessionId); + try + { + Thread.sleep(writeMillis); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + s.writeInt(pageId); + s.writeLong(writeMillis); + s.writeLong(readMillis); + s.writeObject(sessionId); + } + + private void readObject(java.io.ObjectInputStream s) + throws IOException, ClassNotFoundException + { + log.debug("deserializing page {} for {}ms (session {})", getPageId(), writeMillis, + sessionId); + try + { + Thread.sleep(readMillis); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + pageId = s.readInt(); + writeMillis = s.readLong(); + readMillis = s.readLong(); + sessionId = (String)s.readObject(); + } + + public String toString() + { + return "DummyPage[pageId = " + pageId + ", writeMillis = " + writeMillis + + ", readMillis = " + readMillis + ", sessionId = " + sessionId + ", hashCode = " + + hashCode() + "]"; + } + } + + /** + * Store works fully asynchronous when number of pages handled never exceeds the + * asynchronous-storage capacity. + * + * @throws InterruptedException + */ + @Test + public void storeBehavesAsyncWhenNotExceedingStoreCapacity() throws InterruptedException + { + int sessions = 2; + int pages = 5; + long writeMillis = 2000; + long readMillis = 1500; + int asyncPageStoreCapacity = pages * sessions; + + List<Metrics> results = runTest(sessions, pages, writeMillis, readMillis, + asyncPageStoreCapacity); + + for (Metrics metrics : results) + System.out.println(metrics); + + for (Metrics metrics : results) + { + assertEquals(metrics.storedPage, metrics.restoredPage); + assertTrue(metrics.storingMillis < writeMillis); + assertTrue(metrics.restoringMillis < readMillis); + } + } + + /** + * Store behaves synchronous from when number of pages handled exceeds the given + * asynchronous-storage capacity. It works asynchronous until the number of pages reaches the + * limit (capacity). + * + * @throws InterruptedException + */ + @Test + public void storeBehavesSyncFromWhenExceedingStoreCapacity() throws InterruptedException + { + int sessions = 2; + int pages = 5; + long writeMillis = 2000; + long readMillis = 1500; + int asyncPageStoreCapacity = pages; // only up to the first round of + // pages + + List<Metrics> results = runTest(sessions, pages, writeMillis, readMillis, + asyncPageStoreCapacity); + + for (Metrics metrics : results) + System.out.println(metrics); + + int sync = 0; + + for (int i = 0; i < results.size(); i++) + { + Metrics metrics = results.get(i); + + assertEquals(metrics.storedPage.sessionId, metrics.restoredPage.sessionId); + assertEquals(metrics.storedPage.getPageId(), metrics.restoredPage.getPageId()); + + if (!metrics.storedPage.equals(metrics.restoredPage)) + { + assertTrue(metrics.storingMillis >= metrics.storedPage.writeMillis); + sync++; + } + } + + assertTrue(sync > 0); + } + + // test run + + private class Metrics + { + private DummyPage storedPage; + private long storingMillis; + private DummyPage restoredPage; + private long restoringMillis; + + public String toString() + { + return "Metrics[storedPage = " + storedPage + ", storingMillis = " + storingMillis + + ", restoredPage = " + restoredPage + ", restoringMillis = " + restoringMillis + "]"; + } + } + + private List<Metrics> runTest(int sessions, int pages, long writeMillis, long readMillis, + int asyncPageStoreCapacity) throws InterruptedException + { + + List<Metrics> results = new ArrayList<>(); + + final CountDownLatch lock = new CountDownLatch(pages * sessions); + + ISerializer serializer = new DeflatedJavaSerializer("applicationKey"); + // ISerializer serializer = new DummySerializer(); + IDataStore dataStore = new DiskDataStore("applicationName", new File("./target"), + Bytes.bytes(10000l)); + IPageStore pageStore = new DefaultPageStore(serializer, dataStore, 0) + { + // IPageStore pageStore = new DummyPageStore(new + // File("target/store")) { + + @Override + public void storePage(String sessionId, IManageablePage page) + { + + super.storePage(sessionId, page); + + lock.countDown(); + } + }; + + IPageStore asyncPageStore = new AsynchronousPageStore(pageStore, asyncPageStoreCapacity); + + Stopwatch stopwatch = Stopwatch.createUnstarted(); + + for (int pageId = 1; pageId <= pages; pageId++) + { + for (int sessionId = 1; sessionId <= sessions; sessionId++) + { + String session = String.valueOf(sessionId); + Metrics metrics = new Metrics(); + + stopwatch.reset(); + DummyPage page = new DummyPage(pageId, around(writeMillis), around(readMillis), + session); + stopwatch.start(); + asyncPageStore.storePage(session, page); + metrics.storedPage = page; + metrics.storingMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + stopwatch.reset(); + stopwatch.start(); + metrics.restoredPage = DummyPage.class + .cast(asyncPageStore.getPage(session, pageId)); + metrics.restoringMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + results.add(metrics); + } + } + + lock.await(pages * sessions * (writeMillis + readMillis), TimeUnit.MILLISECONDS); + + return results; + } + + private long around(long target) + { + return RandomUtils.nextLong((long)(target * .9), (long)(target * 1.1)); + } + + // other aux dummy impls for testing + + private class DummySerializer implements ISerializer + { + + @Override + public byte[] serialize(Object obj) + { + ByteArrayOutputStream bos = null; + ObjectOutput out = null; + try + { + bos = new ByteArrayOutputStream(); + out = new ObjectOutputStream(bos); + out.writeObject(obj); + return bos.toByteArray(); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (bos != null) + bos.close(); + if (out != null) + out.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + @Override + public Object deserialize(byte[] bytes) + { + ByteArrayInputStream bis = null; + ObjectInput in = null; + try + { + bis = new ByteArrayInputStream(bytes); + in = new ObjectInputStream(bis); + return in.readObject(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (bis != null) + bis.close(); + if (in != null) + in.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + } + + private class DummyPageStore implements IPageStore + { + + private File folder; + + private DummyPageStore(File folder) + { + folder.mkdirs(); + this.folder = folder; + } + + private File getPageFile(String sessionId, int pageId) + { + return new File(folder.getAbsolutePath() + "/" + sessionId + "-" + pageId + ".page"); + } + + private void toFile(Object obj, File file) + { + FileOutputStream fos = null; + ObjectOutput oo = null; + try + { + fos = new FileOutputStream(file); + oo = new ObjectOutputStream(fos); + oo.writeObject(obj); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (fos != null) + fos.close(); + if (oo != null) + oo.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + private Object fromFile(File file) + { + FileInputStream fis = null; + ObjectInput oi = null; + try + { + fis = new FileInputStream(file); + oi = new ObjectInputStream(fis); + return oi.readObject(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (fis != null) + fis.close(); + if (oi != null) + oi.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + @Override + public void destroy() + { + } + + @Override + public IManageablePage getPage(String sessionId, int pageId) + { + return (IManageablePage)fromFile(getPageFile(sessionId, pageId)); + } + + @Override + public void removePage(String sessionId, int pageId) + { + } + + @Override + public void storePage(String sessionId, IManageablePage page) + { + toFile(page, getPageFile(sessionId, page.getPageId())); + } + + @Override + public void unbind(String sessionId) + { + } + + @Override + public Serializable prepareForSerialization(String sessionId, Serializable page) + { + return null; + } + + @Override + public Object restoreAfterSerialization(Serializable serializable) + { + return null; + } + + @Override + public IManageablePage convertToPage(Object page) + { + return null; + } + } + +}
