Repository: wicket Updated Branches: refs/heads/master c45af373c -> 312bddbc4
WICKET-6177 Blocking page serialization Project: http://git-wip-us.apache.org/repos/asf/wicket/repo Commit: http://git-wip-us.apache.org/repos/asf/wicket/commit/2ffd587d Tree: http://git-wip-us.apache.org/repos/asf/wicket/tree/2ffd587d Diff: http://git-wip-us.apache.org/repos/asf/wicket/diff/2ffd587d Branch: refs/heads/master Commit: 2ffd587dea4c5e1edec0853f35a9b5761e391917 Parents: e8aea22 Author: manuelbarzi <[email protected]> Authored: Sun Feb 12 02:23:40 2017 +0100 Committer: manuelbarzi <[email protected]> Committed: Sun Feb 12 02:23:40 2017 +0100 ---------------------------------------------------------------------- pom.xml | 10 + wicket-core/pom.xml | 10 + .../apache/wicket/pageStore/AsyncPageStore.java | 373 ++++++++++++++ .../wicket/pageStore/AsyncPageStoreTest.java | 497 +++++++++++++++++++ 4 files changed, 890 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/wicket/blob/2ffd587d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d5beb05..80e236d 100644 --- a/pom.xml +++ b/pom.xml @@ -151,6 +151,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>21.0</version> + </dependency> + <dependency> <groupId>javax.el</groupId> <artifactId>javax.el-api</artifactId> <version>3.0.1-b04</version> @@ -242,6 +247,11 @@ <version>4.1</version> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.5</version> + </dependency> + <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity</artifactId> <version>1.7</version> http://git-wip-us.apache.org/repos/asf/wicket/blob/2ffd587d/wicket-core/pom.xml ---------------------------------------------------------------------- diff --git a/wicket-core/pom.xml b/wicket-core/pom.xml index bcc8d1d..a1ee73d 100644 --- a/wicket-core/pom.xml +++ b/wicket-core/pom.xml @@ -66,6 +66,16 @@ <groupId>com.tdunning</groupId> <artifactId>json</artifactId> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <pluginManagement> http://git-wip-us.apache.org/repos/asf/wicket/blob/2ffd587d/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java new file mode 100644 index 0000000..213164d --- /dev/null +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsyncPageStore.java @@ -0,0 +1,373 @@ +package org.apache.wicket.pageStore; + +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.wicket.page.IManageablePage; +import org.apache.wicket.util.lang.Args; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Facade for {@link IPageStore} that does the actual saving in worker thread. + * <p> + * Creates an {@link Entry} for each double (sessionId, page) and puts it in {@link #entries} queue + * if there is room. Acts as producer.<br/> + * Later {@link PageSavingRunnable} reads in blocking manner from {@link #entries} and saves each + * entry. Acts as consumer. + * </p> + * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page + * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore}, + * though it may happen in the extreme case when the queue is full. These cases should be avoided. + * + * Based on AsynchronousDataStore (@author Matej Knopp). + * + * @author manuelbarzi + */ +public class AsyncPageStore implements IPageStore +{ + + /** Log for reporting. */ + private static final Logger log = LoggerFactory.getLogger(AsyncPageStore.class); + + /** + * The time to wait when adding an {@link Entry} into the entries. In millis. + */ + private static final long OFFER_WAIT = 30L; + + /** + * The time to wait for an entry to save with the wrapped {@link IPageStore} . In millis. + */ + private static final long POLL_WAIT = 1000L; + + /** + * The page saving thread. + */ + private final Thread pageSavingThread; + + /** + * The wrapped {@link IPageStore} that actually stores that pages + */ + private final IPageStore pageStore; + + /** + * The queue where the entries which have to be saved are temporary stored + */ + private final BlockingQueue<Entry> entries; + + /** + * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval of {@link Entry}s which + * are not yet stored by the wrapped {@link IPageStore} + */ + private final ConcurrentMap<String, Entry> entryMap; + + /** + * Construct. + * + * @param pageStore + * the wrapped {@link IPageStore} that actually saved the page + * @param capacity + * the capacity of the queue that delays the saving + */ + public AsyncPageStore(final IPageStore pageStore, final int capacity) + { + this.pageStore = pageStore; + entries = new LinkedBlockingQueue<Entry>(capacity); + entryMap = new ConcurrentHashMap<String, Entry>(); + + PageSavingRunnable savingRunnable = new PageSavingRunnable(pageStore, entries, entryMap); + pageSavingThread = new Thread(savingRunnable, "AsyncPageStore-PageSavingThread"); + pageSavingThread.setDaemon(true); + pageSavingThread.start(); + } + + /** + * Little helper + * + * @param sessionId + * @param pageId + * @return Entry + */ + private Entry getEntry(final String sessionId, final int pageId) + { + return entryMap.get(getKey(sessionId, pageId)); + } + + /** + * + * @param pageId + * @param sessionId + * @return generated key + */ + private static String getKey(final String sessionId, final int pageId) + { + return pageId + ":::" + sessionId; + } + + /** + * + * @param entry + * @return generated key + */ + private static String getKey(final Entry entry) + { + return getKey(entry.sessionId, entry.page.getPageId()); + } + + /** + * The structure used for an entry in the queue + */ + private static class Entry + { + private final String sessionId; + private final IManageablePage page; + + public Entry(final String sessionId, final IManageablePage page) + { + this.sessionId = Args.notNull(sessionId, "sessionId"); + this.page = Args.notNull(page, "page"); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + page.getPageId(); + result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Entry other = (Entry)obj; + if (page.getPageId() != other.page.getPageId()) + return false; + if (sessionId == null) + { + if (other.sessionId != null) + return false; + } + else if (!sessionId.equals(other.sessionId)) + return false; + return true; + } + + @Override + public String toString() + { + return "Entry [sessionId=" + sessionId + ", pageId=" + page.getPageId() + "]"; + } + + } + + /** + * The thread that acts as consumer of {@link Entry}ies + */ + private static class PageSavingRunnable implements Runnable + { + private static final Logger log = LoggerFactory.getLogger(PageSavingRunnable.class); + + private final BlockingQueue<Entry> entries; + + private final ConcurrentMap<String, Entry> entryMap; + + private final IPageStore pageStore; + + private PageSavingRunnable(IPageStore pageStore, BlockingQueue<Entry> entries, + ConcurrentMap<String, Entry> entryMap) + { + this.pageStore = pageStore; + this.entries = entries; + this.entryMap = entryMap; + } + + @Override + public void run() + { + while (!Thread.interrupted()) + { + Entry entry = null; + try + { + entry = entries.poll(POLL_WAIT, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + + if (entry != null) + { + log.debug("Saving asynchronously: {}...", entry); + pageStore.storePage(entry.sessionId, entry.page); + entryMap.remove(getKey(entry)); + } + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#destroy() + */ + @Override + public void destroy() + { + if (pageSavingThread.isAlive()) + { + pageSavingThread.interrupt(); + try + { + pageSavingThread.join(); + } + catch (InterruptedException e) + { + log.error(e.getMessage(), e); + } + } + + pageStore.destroy(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#getPage(java.lang.String, int) + */ + @Override + public IManageablePage getPage(String sessionId, int pageId) + { + Entry entry = getEntry(sessionId, pageId); + if (entry != null) + { + log.debug( + "Returning the page of a non-stored entry with session id '{}' and page id '{}'", + sessionId, pageId); + return entry.page; + } + IManageablePage page = pageStore.getPage(sessionId, pageId); + + log.debug("Returning the page of a stored entry with session id '{}' and page id '{}'", + sessionId, pageId); + + return page; + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#removePage(java.lang.String, int) + */ + @Override + public void removePage(String sessionId, int pageId) + { + String key = getKey(sessionId, pageId); + if (key != null) + { + Entry entry = entryMap.remove(key); + if (entry != null) + { + entries.remove(entry); + } + } + + pageStore.removePage(sessionId, pageId); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#storePage(java.lang.String, + * org.apache.wicket.page.IManageablePage) + */ + @Override + public void storePage(String sessionId, IManageablePage page) + { + Entry entry = new Entry(sessionId, page); + String key = getKey(entry); + entryMap.put(key, entry); + + try + { + if (entries.offer(entry, OFFER_WAIT, TimeUnit.MILLISECONDS)) + { + log.debug("Offered for storing asynchronously page with id '{}' in session '{}'", + page.getPageId(), sessionId); + } + else + { + log.debug("Storing synchronously page with id '{}' in session '{}'", + page.getPageId(), sessionId); + entryMap.remove(key); + pageStore.storePage(sessionId, page); + } + } + catch (InterruptedException e) + { + log.error(e.getMessage(), e); + entryMap.remove(key); + pageStore.storePage(sessionId, page); + } + + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#unbind(java.lang.String) + */ + @Override + public void unbind(String sessionId) + { + pageStore.unbind(sessionId); + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#prepareForSerialization(java.lang. String, + * java.io.Serializable) + */ + @Override + public Serializable prepareForSerialization(String sessionId, Serializable page) + { + return pageStore.prepareForSerialization(sessionId, page); + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#restoreAfterSerialization(java.io. Serializable) + */ + @Override + public Object restoreAfterSerialization(Serializable serializable) + { + return pageStore.restoreAfterSerialization(serializable); + } + + /* + * (non-Javadoc) + * + * @see org.apache.wicket.pageStore.IPageStore#convertToPage(java.lang.Object) + */ + @Override + public IManageablePage convertToPage(Object page) + { + return pageStore.convertToPage(page); + } + +} http://git-wip-us.apache.org/repos/asf/wicket/blob/2ffd587d/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java b/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java new file mode 100644 index 0000000..9849471 --- /dev/null +++ b/wicket-core/src/test/java/org/apache/wicket/pageStore/AsyncPageStoreTest.java @@ -0,0 +1,497 @@ +package org.apache.wicket.pageStore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.wicket.page.IManageablePage; +import org.apache.wicket.serialize.ISerializer; +import org.apache.wicket.serialize.java.DeflatedJavaSerializer; +import org.apache.wicket.util.file.File; +import org.apache.wicket.util.lang.Bytes; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; + +/** + * AsyncPageStoreTest + * + * @author manuelbarzi + * + */ +public class AsyncPageStoreTest +{ + + /** Log for reporting. */ + private static final Logger log = LoggerFactory.getLogger(AsyncPageStoreTest.class); + + @SuppressWarnings("serial") + private static class DummyPage implements IManageablePage + { + + private int pageId; + private long writeMillis; + private long readMillis; + private String sessionId; + + private DummyPage(int pageId, long writeMillis, long readMillis, String sessionId) + { + this.pageId = pageId; + this.writeMillis = writeMillis; + this.readMillis = readMillis; + this.sessionId = sessionId; + } + + @Override + public boolean isPageStateless() + { + return false; + } + + @Override + public int getPageId() + { + return pageId; + } + + @Override + public void detach() + { + } + + @Override + public boolean setFreezePageId(boolean freeze) + { + return false; + } + + /** + * @param s + * @throws IOException + */ + private void writeObject(java.io.ObjectOutputStream s) throws IOException + { + log.debug("serializing page {} for {}ms (session {})", getPageId(), writeMillis, + sessionId); + try + { + Thread.sleep(writeMillis); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + s.writeInt(pageId); + s.writeLong(writeMillis); + s.writeLong(readMillis); + s.writeObject(sessionId); + } + + private void readObject(java.io.ObjectInputStream s) + throws IOException, ClassNotFoundException + { + log.debug("deserializing page {} for {}ms (session {})", getPageId(), writeMillis, + sessionId); + try + { + Thread.sleep(readMillis); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + pageId = s.readInt(); + writeMillis = s.readLong(); + readMillis = s.readLong(); + sessionId = (String)s.readObject(); + } + + public String toString() + { + return "DummyPage[pageId = " + pageId + ", writeMillis = " + writeMillis + + ", readMillis = " + readMillis + ", sessionId = " + sessionId + ", hashCode = " + + hashCode() + "]"; + } + } + + /** + * Store works fully async when number of pages handled never exceeds the async-storage + * capacity. + * + * @throws InterruptedException + */ + @Test + public void storeBehavesAsyncWhenNotExceedingStoreCapacity() throws InterruptedException + { + int sessions = 2; + int pages = 5; + long writeMillis = 2000; + long readMillis = 1500; + int asyncPageStoreCapacity = pages * sessions; + + List<Metrics> results = runTest(sessions, pages, writeMillis, readMillis, + asyncPageStoreCapacity); + + for (Metrics metrics : results) + System.out.println(metrics); + + for (Metrics metrics : results) + { + assertEquals(metrics.storedPage, metrics.restoredPage); + assertTrue(metrics.storingMillis < writeMillis); + assertTrue(metrics.restoringMillis < readMillis); + } + } + + /** + * Store behaves sync from when number of pages handled exceeds the given async-storage + * capacity. It works async until the number of pages reaches the limit (capacity). + * + * @throws InterruptedException + */ + @Test + public void storeBehavesSyncFromWhenExceedingStoreCapacity() throws InterruptedException + { + int sessions = 2; + int pages = 5; + long writeMillis = 2000; + long readMillis = 1500; + int asyncPageStoreCapacity = pages; // only up to the first round of + // pages + + List<Metrics> results = runTest(sessions, pages, writeMillis, readMillis, + asyncPageStoreCapacity); + + for (Metrics metrics : results) + System.out.println(metrics); + + int sync = 0; + + for (int i = 0; i < results.size(); i++) + { + Metrics metrics = results.get(i); + + assertEquals(metrics.storedPage.sessionId, metrics.restoredPage.sessionId); + assertEquals(metrics.storedPage.getPageId(), metrics.restoredPage.getPageId()); + + if (!metrics.storedPage.equals(metrics.restoredPage)) + { + assertTrue(metrics.storingMillis >= metrics.storedPage.writeMillis); + sync++; + } + } + + assertTrue(sync > 0); + } + + // test run + + private class Metrics + { + private DummyPage storedPage; + private long storingMillis; + private DummyPage restoredPage; + private long restoringMillis; + + public String toString() + { + return "Metrics[storedPage = " + storedPage + ", storingMillis = " + storingMillis + + ", restoredPage = " + restoredPage + ", restoringMillis = " + restoringMillis + "]"; + } + } + + private List<Metrics> runTest(int sessions, int pages, long writeMillis, long readMillis, + int asyncPageStoreCapacity) throws InterruptedException + { + + List<Metrics> results = new ArrayList<>(); + + final CountDownLatch lock = new CountDownLatch(pages * sessions); + + ISerializer serializer = new DeflatedJavaSerializer("applicationKey"); + // ISerializer serializer = new DummySerializer(); + IDataStore dataStore = new DiskDataStore("applicationName", new File("./target"), + Bytes.bytes(10000l)); + IPageStore pageStore = new DefaultPageStore(serializer, dataStore, 0) + { + // IPageStore pageStore = new DummyPageStore(new + // File("target/store")) { + + @Override + public void storePage(String sessionId, IManageablePage page) + { + + super.storePage(sessionId, page); + + lock.countDown(); + } + }; + + IPageStore asyncPageStore = new AsyncPageStore(pageStore, asyncPageStoreCapacity); + + Stopwatch stopwatch = Stopwatch.createUnstarted(); + + for (int pageId = 1; pageId <= pages; pageId++) + { + for (int sessionId = 1; sessionId <= sessions; sessionId++) + { + String session = String.valueOf(sessionId); + Metrics metrics = new Metrics(); + + stopwatch.reset(); + DummyPage page = new DummyPage(pageId, around(writeMillis), around(readMillis), + session); + stopwatch.start(); + asyncPageStore.storePage(session, page); + metrics.storedPage = page; + metrics.storingMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + stopwatch.reset(); + stopwatch.start(); + metrics.restoredPage = DummyPage.class + .cast(asyncPageStore.getPage(session, pageId)); + metrics.restoringMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + results.add(metrics); + } + } + + lock.await(pages * sessions * (writeMillis + readMillis), TimeUnit.MILLISECONDS); + + return results; + } + + private long around(long target) + { + return RandomUtils.nextLong((long)(target * .9), (long)(target * 1.1)); + } + + // other aux dummy impls for testing + + private class DummySerializer implements ISerializer + { + + @Override + public byte[] serialize(Object obj) + { + ByteArrayOutputStream bos = null; + ObjectOutput out = null; + try + { + bos = new ByteArrayOutputStream(); + out = new ObjectOutputStream(bos); + out.writeObject(obj); + return bos.toByteArray(); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (bos != null) + bos.close(); + if (out != null) + out.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + @Override + public Object deserialize(byte[] bytes) + { + ByteArrayInputStream bis = null; + ObjectInput in = null; + try + { + bis = new ByteArrayInputStream(bytes); + in = new ObjectInputStream(bis); + return in.readObject(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (bis != null) + bis.close(); + if (in != null) + in.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + } + + private class DummyPageStore implements IPageStore + { + + private File folder; + + private DummyPageStore(File folder) + { + folder.mkdirs(); + this.folder = folder; + } + + private File getPageFile(String sessionId, int pageId) + { + return new File(folder.getAbsolutePath() + "/" + sessionId + "-" + pageId + ".page"); + } + + private void toFile(Object obj, File file) + { + FileOutputStream fos = null; + ObjectOutput oo = null; + try + { + fos = new FileOutputStream(file); + oo = new ObjectOutputStream(fos); + oo.writeObject(obj); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (fos != null) + fos.close(); + if (oo != null) + oo.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + private Object fromFile(File file) + { + FileInputStream fis = null; + ObjectInput oi = null; + try + { + fis = new FileInputStream(file); + oi = new ObjectInputStream(fis); + return oi.readObject(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + if (fis != null) + fis.close(); + if (oi != null) + oi.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + @Override + public void destroy() + { + } + + @Override + public IManageablePage getPage(String sessionId, int pageId) + { + return (IManageablePage)fromFile(getPageFile(sessionId, pageId)); + } + + @Override + public void removePage(String sessionId, int pageId) + { + } + + @Override + public void storePage(String sessionId, IManageablePage page) + { + toFile(page, getPageFile(sessionId, page.getPageId())); + } + + @Override + public void unbind(String sessionId) + { + } + + @Override + public Serializable prepareForSerialization(String sessionId, Serializable page) + { + return null; + } + + @Override + public Object restoreAfterSerialization(Serializable serializable) + { + return null; + } + + @Override + public IManageablePage convertToPage(Object page) + { + return null; + } + } + +}
