http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java deleted file mode 100644 index f749354..0000000 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.wicket.pageStore; - -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.wicket.util.lang.Args; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Facade for {@link IDataStore} that does the actual saving in worker thread. - * <p> - * Creates an {@link Entry} for each triple (sessionId, pageId, data) and puts it in - * {@link #entries} queue if there is room. Acts as producer.<br/> - * Later {@link PageSavingRunnable} reads in blocking manner from {@link #entries} and saves each - * entry. Acts as consumer. - * </p> - * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page - * storing asynchronous. We don't want to write concurrently in the wrapped {@link IDataStore}, - * though it may happen in the extreme case when the queue is full. These cases should be avoided. - * - * @author Matej Knopp - */ -public class AsynchronousDataStore implements IDataStore -{ - /** Log for reporting. */ - private static final Logger log = LoggerFactory.getLogger(AsynchronousDataStore.class); - - /** - * The time to wait when adding an {@link Entry} into the entries. In millis. - */ - private static final long OFFER_WAIT = 30L; - - /** - * The time to wait for an entry to save with the wrapped {@link IDataStore}. In millis. - */ - private static final long POLL_WAIT = 1000L; - - /** - * The page saving thread. - */ - private Thread pageSavingThread; - - /** - * The wrapped {@link IDataStore} that actually stores that pages - */ - private final IDataStore dataStore; - - /** - * The queue where the entries which have to be saved are temporary stored - */ - private final BlockingQueue<Entry> entries; - - /** - * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval of {@link Entry}s which - * are not yet stored by the wrapped {@link IDataStore} - */ - private final ConcurrentMap<String, Entry> entryMap; - - /** - * Construct. - * - * @param dataStore - * the wrapped {@link IDataStore} that actually saved the data - * @param capacity - * the capacity of the queue that delays the saving - */ - public AsynchronousDataStore(final IDataStore dataStore, final int capacity) - { - this.dataStore = dataStore; - entries = new LinkedBlockingQueue<>(capacity); - entryMap = new ConcurrentHashMap<>(); - - pageSavingThread = new Thread(new PageSavingRunnable(), "Wicket-AsyncDataStore-PageSavingThread"); - pageSavingThread.setDaemon(true); - pageSavingThread.start(); - } - - @Override - public void destroy() - { - final Thread thread = pageSavingThread; - pageSavingThread = null; - if (thread != null && thread.isAlive()) - { - try - { - thread.join(); - } catch (InterruptedException e) - { - log.error(e.getMessage(), e); - } - } - - dataStore.destroy(); - } - - /** - * Little helper - * - * @param sessionId - * @param id - * @return Entry - */ - private Entry getEntry(final String sessionId, final int id) - { - return entryMap.get(getKey(sessionId, id)); - } - - @Override - public byte[] getData(final String sessionId, final int id) - { - Entry entry = getEntry(sessionId, id); - if (entry != null) - { - log.debug( - "Returning the data of a non-stored entry with sessionId '{}' and pageId '{}'", - sessionId, id); - return entry.data; - } - byte[] data = dataStore.getData(sessionId, id); - - log.debug("Returning the data of a stored entry with sessionId '{}' and pageId '{}'", - sessionId, id); - - return data; - } - - @Override - public boolean isReplicated() - { - return dataStore.isReplicated(); - } - - @Override - public void removeData(final String sessionId, final int id) - { - String key = getKey(sessionId, id); - if (key != null) - { - Entry entry = entryMap.remove(key); - if (entry != null) - { - entries.remove(entry); - } - } - - dataStore.removeData(sessionId, id); - } - - @Override - public void removeData(final String sessionId) - { - for (Iterator<Entry> itor = entries.iterator(); itor.hasNext();) - { - Entry entry = itor.next(); - if (entry != null) // this check is not needed in JDK6 - { - String entrySessionId = entry.sessionId; - - if (sessionId.equals(entrySessionId)) - { - entryMap.remove(getKey(entry)); - itor.remove(); - } - } - } - - dataStore.removeData(sessionId); - } - - /** - * Save the entry in the queue if there is a room or directly pass it to the wrapped - * {@link IDataStore} if there is no such - * - * @see org.apache.wicket.pageStore.IDataStore#storeData(java.lang.String, int, byte[]) - */ - @Override - public void storeData(final String sessionId, final int id, final byte[] data) - { - if (pageSavingThread == null) - { - return; - } - Entry entry = new Entry(sessionId, id, data); - String key = getKey(entry); - entryMap.put(key, entry); - - try - { - boolean added = entries.offer(entry, OFFER_WAIT, TimeUnit.MILLISECONDS); - - if (added == false) - { - log.debug("Storing synchronously data with id '{}' in session '{}'", id, sessionId); - entryMap.remove(key); - dataStore.storeData(sessionId, id, data); - } - } - catch (InterruptedException e) - { - log.error(e.getMessage(), e); - if (pageSavingThread != null) - { - entryMap.remove(key); - dataStore.storeData(sessionId, id, data); - } - } - } - - /** - * - * @param pageId - * @param sessionId - * @return generated key - */ - private static String getKey(final String sessionId, final int pageId) - { - return pageId + ":::" + sessionId; - } - - /** - * - * @param entry - * @return generated key - */ - private static String getKey(final Entry entry) - { - return getKey(entry.sessionId, entry.pageId); - } - - /** - * The structure used for an entry in the queue - */ - private static class Entry - { - private final String sessionId; - private final int pageId; - private final byte data[]; - - public Entry(final String sessionId, final int pageId, final byte data[]) - { - this.sessionId = Args.notNull(sessionId, "sessionId"); - this.pageId = pageId; - this.data = Args.notNull(data, "data"); - } - - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = prime * result + pageId; - result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Entry other = (Entry)obj; - if (pageId != other.pageId) - return false; - if (sessionId == null) - { - if (other.sessionId != null) - return false; - } - else if (!sessionId.equals(other.sessionId)) - return false; - return true; - } - - @Override - public String toString() - { - return "Entry [sessionId=" + sessionId + ", pageId=" + pageId + "]"; - } - - } - - /** - * The thread that acts as consumer of {@link Entry}ies - */ - private class PageSavingRunnable implements Runnable - { - @Override - public void run() - { - while (pageSavingThread != null) - { - Entry entry = null; - try - { - entry = entries.poll(POLL_WAIT, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - log.debug("PageSavingRunnable:: Interrupted..."); - } - - if (entry != null && pageSavingThread != null) - { - log.debug("PageSavingRunnable:: Saving asynchronously: {}...", entry); - dataStore.storeData(entry.sessionId, entry.pageId, entry.data); - entryMap.remove(getKey(entry)); - } - } - } - } - - @Override - public final boolean canBeAsynchronous() - { - // should not wrap in another AsynchronousDataStore - return false; - } -}
http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java index c24b2a0..0d23925 100644 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java @@ -17,41 +17,43 @@ package org.apache.wicket.pageStore; import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.wicket.MetaDataKey; +import org.apache.wicket.WicketRuntimeException; import org.apache.wicket.page.IManageablePage; import org.apache.wicket.util.lang.Args; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Facade for {@link IPageStore} that does the actual saving in worker thread. + * Facade for {@link IPageStore} moving {@link #addPage(IPageContext, IManageablePage)} to a worker thread. * <p> - * Creates an {@link Entry} for each double (sessionId, page) and puts it in {@link #entries} queue - * if there is room. Acts as producer.<br/> - * Later {@link PageSavingRunnable} reads in blocking manner from {@link #entries} and saves each - * entry. Acts as consumer. - * </p> - * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page + * Creates an {@link PendingAdd} for {@link #addPage(IPageContext, IManageablePage)} and puts ito a {@link #queue}. + * Later {@link PageAddingRunnable} reads in blocking manner from {@link #queue} and performs the add. + * <p> + * It starts only one instance of {@link PageAddingRunnable} because all we need is to make the page * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore}, * though it may happen in the extreme case when the queue is full. These cases should be avoided. - * - * Based on AsynchronousDataStore (@author Matej Knopp). - * + * + * @author Matej Knopp * @author manuelbarzi */ -public class AsynchronousPageStore implements IPageStore +public class AsynchronousPageStore extends DelegatingPageStore { /** Log for reporting. */ private static final Logger log = LoggerFactory.getLogger(AsynchronousPageStore.class); /** - * The time to wait when adding an {@link Entry} into the entries. In millis. + * The time to wait when adding an {@link PendingAdd} into the entries. In millis. */ private static final long OFFER_WAIT = 30L; @@ -63,27 +65,22 @@ public class AsynchronousPageStore implements IPageStore /** * The page saving thread. */ - private Thread pageSavingThread; - - /** - * The wrapped {@link IPageStore} that actually stores that pages - */ - private final IPageStore delegate; + private final Thread pageSavingThread; /** * The queue where the entries which have to be saved are temporary stored */ - private final BlockingQueue<Entry> entries; + private final BlockingQueue<PendingAdd> queue; /** - * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval of {@link Entry}s which + * A map 'sessionId:::pageId' -> {@link PendingAdd}. Used for fast retrieval of {@link PendingAdd}s which * are not yet stored by the wrapped {@link IPageStore} */ - private final ConcurrentMap<String, Entry> entryMap; + private final ConcurrentMap<String, PendingAdd> queueMap; /** * Construct. - * + * * @param delegate * the wrapped {@link IPageStore} that actually saved the page * @param capacity @@ -91,29 +88,19 @@ public class AsynchronousPageStore implements IPageStore */ public AsynchronousPageStore(final IPageStore delegate, final int capacity) { - this.delegate = Args.notNull(delegate, "delegate"); - entries = new LinkedBlockingQueue<>(capacity); - entryMap = new ConcurrentHashMap<>(); + super(delegate); + + queue = new LinkedBlockingQueue<>(capacity); + queueMap = new ConcurrentHashMap<>(); - pageSavingThread = new Thread(new PageSavingRunnable(), "Wicket-AsyncPageStore-PageSavingThread"); + PageAddingRunnable savingRunnable = new PageAddingRunnable(delegate, queue, queueMap); + pageSavingThread = new Thread(savingRunnable, "Wicket-AsyncPageStore-PageSavingThread"); pageSavingThread.setDaemon(true); pageSavingThread.start(); } /** - * Little helper - * - * @param sessionId - * @param pageId - * @return Entry - */ - private Entry getEntry(final String sessionId, final int pageId) - { - return entryMap.get(getKey(sessionId, pageId)); - } - - /** - * + * * @param pageId * @param sessionId * @return generated key @@ -124,89 +111,210 @@ public class AsynchronousPageStore implements IPageStore } /** - * - * @param entry - * @return generated key + * An add of a page that is pending its asynchronous execution. + * <p> + * Used as an isolating {@link IPageContext} for the delegation to + * {@link IPageStore#addPage(IPageContext, IManageablePage)}. */ - private static String getKey(final Entry entry) + private static class PendingAdd implements IPageContext { - return getKey(entry.sessionId, entry.page.getPageId()); - } + private final IPageContext context; + + private final IManageablePage page; - /** - * The structure used for an entry in the queue - */ - private static class Entry - { private final String sessionId; - private final IManageablePage page; - public Entry(final String sessionId, final IManageablePage page) + /** + * Is this context passed to an asynchronously called {@link IPageStore#addPage(IPageContext, IManageablePage)}. + */ + private boolean asynchronous = false; + + /** + * Cache of session attributes which may filled in {@link IPageStore#canBeAsynchronous(IPageContext)}, + * so these are available asynchronously later on. + */ + private final Map<String, Serializable> attributeCache = new HashMap<>(); + + public PendingAdd(final IPageContext context, final IManageablePage page) { - this.sessionId = Args.notNull(sessionId, "sessionId"); + this.context = Args.notNull(context, "context"); this.page = Args.notNull(page, "page"); + + context.bind(); + this.sessionId = context.getSessionId(); + } + + /** + * + * @param entry + * @return generated key + */ + private String getKey() + { + return AsynchronousPageStore.getKey(sessionId, page.getPageId()); } @Override - public int hashCode() + public String toString() { - final int prime = 31; - int result = 1; - result = prime * result + page.getPageId(); - result = prime * result + sessionId.hashCode(); - return result; + return "PendingAdd [sessionId=" + sessionId + ", pageId=" + page.getPageId() + "]"; } + /** + * Prevents access to request when called asynchronously. + */ @Override - public boolean equals(Object obj) + public <T> void setRequestData(MetaDataKey<T> key, T value) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Entry other = (Entry)obj; - if (page.getPageId() != other.page.getPageId()) - return false; - if (!sessionId.equals(other.sessionId)) - return false; - return true; + if (asynchronous) { + throw new WicketRuntimeException("no request available asynchronuously"); + } + + context.setRequestData(key, value); } + /** + * Prevents access to request when called asynchronously. + */ @Override - public String toString() + public <T> T getRequestData(MetaDataKey<T> key) + { + if (asynchronous) { + throw new WicketRuntimeException("no request available asynchronuously"); + } + + return context.getRequestData(key); + } + + /** + * Prevents access to the session when called asynchronously. + * <p> + * All values set from {@link IPageStore#canBeAsynchronous(IPageContext)} are kept + * for later retrieval. + */ + @Override + public <T extends Serializable> void setSessionAttribute(String key, T value) + { + if (asynchronous) { + throw new WicketRuntimeException("no session available asynchronuously"); + } + + if (value != null) { + attributeCache.put(key, value); + } + + context.setSessionAttribute(key, value); + } + + /** + * Prevents access to the session when called asynchronously. + * <p> + * All values set from {@link IPageStore#canBeAsynchronous(IPageContext)} are still + * available. + */ + @SuppressWarnings("unchecked") + @Override + public <T extends Serializable> T getSessionAttribute(String key) + { + if (asynchronous) { + T value = (T)attributeCache.get(key); + if (value != null) { + return value; + } + + throw new WicketRuntimeException("no session available asynchronuously"); + } + + T value = context.getSessionAttribute(key); + if (value != null) { + attributeCache.put(key, value); + } + + return value; + } + + /** + * Prevents access to the session when called asynchronously. + */ + @Override + public <T extends Serializable> T setSessionData(MetaDataKey<T> key, T value) + { + if (asynchronous) { + throw new WicketRuntimeException("no session available asynchronuously"); + } + + return context.setSessionData(key, value); + } + + /** + * Gets data from the session. + */ + @Override + public <T extends Serializable> T getSessionData(MetaDataKey<T> key) { - return "Entry [sessionId=" + sessionId + ", pageId=" + page.getPageId() + "]"; + return context.getSessionData(key); } + /** + * Has no effect if already bound. + */ + @Override + public void bind() + { + } + + /** + * Returns id of session. + */ + @Override + public String getSessionId() + { + return sessionId; + } } /** - * The thread that acts as consumer of {@link Entry}ies + * The consumer of {@link PendingAdd}s. */ - private class PageSavingRunnable implements Runnable + private static class PageAddingRunnable implements Runnable { + private static final Logger log = LoggerFactory.getLogger(PageAddingRunnable.class); + + private final BlockingQueue<PendingAdd> entries; + + private final ConcurrentMap<String, PendingAdd> addQueue; + + private final IPageStore delegate; + + private PageAddingRunnable(IPageStore delegate, BlockingQueue<PendingAdd> entries, + ConcurrentMap<String, PendingAdd> entryMap) + { + this.delegate = delegate; + this.entries = entries; + this.addQueue = entryMap; + } + @Override public void run() { - while (pageSavingThread != null) + while (!Thread.interrupted()) { - Entry entry = null; + PendingAdd add = null; try { - entry = entries.poll(POLL_WAIT, TimeUnit.MILLISECONDS); + add = entries.poll(POLL_WAIT, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - log.debug("PageSavingRunnable:: Interrupted..."); + Thread.currentThread().interrupt(); } - if (entry != null && pageSavingThread != null) + if (add != null) { - log.debug("PageSavingRunnable:: Saving asynchronously: {}...", entry); - delegate.storePage(entry.sessionId, entry.page); - entryMap.remove(getKey(entry)); + log.debug("Saving asynchronously: {}...", add); + add.asynchronous = true; + delegate.addPage(add, add.page); + addQueue.remove(add.getKey()); } } } @@ -215,122 +323,98 @@ public class AsynchronousPageStore implements IPageStore @Override public void destroy() { - final Thread thread = pageSavingThread; - pageSavingThread = null; - if (thread != null && thread.isAlive()) + if (pageSavingThread.isAlive()) { + pageSavingThread.interrupt(); try { - thread.join(); + pageSavingThread.join(); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } - delegate.destroy(); + + super.destroy(); } @Override - public IManageablePage getPage(String sessionId, int pageId) + public IManageablePage getPage(IPageContext context, int pageId) { - Entry entry = getEntry(sessionId, pageId); + PendingAdd entry = queueMap.get(getKey(context.getSessionId(), pageId)); if (entry != null) { - log.debug( - "Returning the page of a non-stored entry with session id '{}' and page id '{}'", - sessionId, pageId); + log.debug("Returning the page of a non-stored entry with page id '{}'", pageId); return entry.page; } - IManageablePage page = delegate.getPage(sessionId, pageId); + IManageablePage page = super.getPage(context, pageId); - log.debug("Returning the page of a stored entry with session id '{}' and page id '{}'", - sessionId, pageId); + log.debug("Returning the page of a stored entry with page id '{}'", pageId); return page; } @Override - public void removePage(String sessionId, int pageId) + public void removePage(IPageContext context, IManageablePage page) { - String key = getKey(sessionId, pageId); + String key = getKey(context.getSessionId(), page.getPageId()); if (key != null) { - Entry entry = entryMap.remove(key); + PendingAdd entry = queueMap.remove(key); if (entry != null) { - entries.remove(entry); + queue.remove(entry); } } - delegate.removePage(sessionId, pageId); + super.removePage(context, page); } @Override - public void storePage(String sessionId, IManageablePage page) + public void addPage(IPageContext context, IManageablePage page) { - if (pageSavingThread == null) - { - return; - } - Entry entry = new Entry(sessionId, page); - String key = getKey(entry); - entryMap.put(key, entry); - - try - { - if (entries.offer(entry, OFFER_WAIT, TimeUnit.MILLISECONDS)) - { - log.debug("Offered for storing asynchronously page with id '{}' in session '{}'", - page.getPageId(), sessionId); - } - else + PendingAdd add = new PendingAdd(context, page); + if (getDelegate().canBeAsynchronous(add)) { + String key = add.getKey(); + queueMap.put(key, add); + try { - log.debug("Storing synchronously page with id '{}' in session '{}'", - page.getPageId(), sessionId); - entryMap.remove(key); - delegate.storePage(sessionId, page); + if (queue.offer(add, OFFER_WAIT, TimeUnit.MILLISECONDS)) + { + log.debug("Offered for storing asynchronously page with id '{}'", page.getPageId()); + return; + } + else + { + log.debug("Storing synchronously page with id '{}'", page.getPageId()); + queueMap.remove(key); + } } - } - catch (InterruptedException e) - { - log.error(e.getMessage(), e); - if (pageSavingThread != null) + catch (InterruptedException e) { - entryMap.remove(key); - delegate.storePage(sessionId, page); + log.error(e.getMessage(), e); + queueMap.remove(key); } + } else { + log.warn("Delegated page store '{}' can not be asynchronous", getDelegate().getClass().getName()); } + + super.addPage(context, page); } @Override - public void unbind(String sessionId) - { - delegate.unbind(sessionId); - } - - @Override - public Serializable prepareForSerialization(String sessionId, Serializable page) - { - return delegate.prepareForSerialization(sessionId, page); - } - - @Override - public Object restoreAfterSerialization(Serializable serializable) - { - return delegate.restoreAfterSerialization(serializable); - } - - @Override - public IManageablePage convertToPage(Object page) + public void removeAllPages(IPageContext context) { - return delegate.convertToPage(page); - } - - @Override - public boolean canBeAsynchronous() - { - // should not wrap in another AsynchronousPageStore - return false; + Iterator<PendingAdd> iterator = queue.iterator(); + while (iterator.hasNext()) { + PendingAdd add = iterator.next(); + + if (add.sessionId.equals(context.getSessionId())) { + iterator.remove(); + } + } + + super.removeAllPages(context); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageContext.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageContext.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageContext.java new file mode 100644 index 0000000..971866e --- /dev/null +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageContext.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.pageStore; + +import java.io.Serializable; + +import org.apache.wicket.MetaDataKey; +import org.apache.wicket.Session; +import org.apache.wicket.request.cycle.RequestCycle; + +/** + * Default page context using a {@link Session}. + * + * @author Juergen Donnerstag + * @author svenmeier + */ +public class DefaultPageContext implements IPageContext +{ + private Session session; + + public DefaultPageContext(Session session) { + this.session = Session.get(); + } + + /** + * @see org.apache.wicket.pageStore.IPageContext#bind() + */ + @Override + public void bind() + { + session.bind(); + } + + /** + * @see org.apache.wicket.pageStore.IPageContext#getSessionId() + */ + @Override + public String getSessionId() + { + return session.getId(); + } + + @SuppressWarnings("unchecked") + @Override + public <T extends Serializable> T getSessionAttribute(String key) + { + return (T)session.getAttribute(key); + } + + @Override + public <T extends Serializable> void setSessionAttribute(String key, T value) + { + session.setAttribute(key, value); + } + + @Override + public <T extends Serializable> T getSessionData(MetaDataKey<T> key) + { + return session.getMetaData(key); + } + + @Override + public <T extends Serializable> T setSessionData(MetaDataKey<T> key, T value) + { + synchronized (session) + { + T oldValue = session.getMetaData(key); + if (oldValue != null) { + return oldValue; + } + + session.setMetaData(key, value); + return value; + } + } + + @Override + public <T> T getRequestData(MetaDataKey<T> key) + { + RequestCycle requestCycle = RequestCycle.get(); + if (requestCycle == null) + { + throw new IllegalStateException("Not a request thread."); + } + return requestCycle.getMetaData(key); + } + + @Override + public <T> void setRequestData(MetaDataKey<T> key, T value) + { + RequestCycle requestCycle = RequestCycle.get(); + if (requestCycle == null) + { + throw new IllegalStateException("Not a request thread."); + } + requestCycle.setMetaData(key, value); + } +} http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageStore.java deleted file mode 100644 index f324b85..0000000 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/DefaultPageStore.java +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.wicket.pageStore; - -import java.io.Serializable; -import java.lang.ref.SoftReference; -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedDeque; - -import org.apache.wicket.page.IManageablePage; -import org.apache.wicket.serialize.ISerializer; -import org.apache.wicket.util.lang.Args; -import org.apache.wicket.util.lang.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link IPageStore} that converts {@link IManageablePage} instances to {@link SerializedPage}s - * before passing them to the {@link IDataStore} to store them and the same in the opposite - * direction when loading {@link SerializedPage} from the data store. - * - */ -public class DefaultPageStore extends AbstractCachingPageStore<DefaultPageStore.SerializedPage> -{ - private static final Logger LOG = LoggerFactory.getLogger(DefaultPageStore.class); - - /** - * Construct. - * - * @param pageSerializer - * the {@link ISerializer} that will be used to convert pages from/to byte arrays - * @param dataStore - * the {@link IDataStore} that actually stores the pages - * @param cacheSize - * the number of pages to cache in memory before passing them to - * {@link IDataStore#storeData(String, int, byte[])} - */ - public DefaultPageStore(final ISerializer pageSerializer, final IDataStore dataStore, - final int cacheSize) - { - super(pageSerializer, dataStore, new SerializedPagesCache(cacheSize)); - } - - @Override - public void storePage(final String sessionId, final IManageablePage page) - { - SerializedPage serialized = createSerializedPage(sessionId, page); - if (serialized != null) - { - int pageId = page.getPageId(); - pagesCache.storePage(sessionId, pageId, serialized); - storePageData(sessionId, pageId, serialized.getData()); - } - } - - @Override - public IManageablePage convertToPage(final Object object) - { - if (object == null) - { - return null; - } - else if (object instanceof IManageablePage) - { - return (IManageablePage)object; - } - else if (object instanceof SerializedPage) - { - SerializedPage page = (SerializedPage)object; - byte data[] = page.getData(); - if (data == null) - { - data = getPageData(page.getSessionId(), page.getPageId()); - } - if (data != null) - { - return deserializePage(data); - } - return null; - } - - String type = object.getClass().getName(); - throw new IllegalArgumentException("Unknown object type " + type); - } - - /** - * Reloads the {@link SerializedPage} from the backing {@link IDataStore} if the - * {@link SerializedPage#data} is stripped earlier - * - * @param serializedPage - * the {@link SerializedPage} with empty {@link SerializedPage#data} slot - * @return the fully functional {@link SerializedPage} - */ - private SerializedPage restoreStrippedSerializedPage(final SerializedPage serializedPage) - { - SerializedPage result = pagesCache.getPage(serializedPage.getSessionId(), - serializedPage.getPageId()); - if (result != null) - { - return result; - } - - byte data[] = getPageData(serializedPage.getSessionId(), serializedPage.getPageId()); - return new SerializedPage(serializedPage.getSessionId(), serializedPage.getPageId(), data); - } - - @Override - public Serializable prepareForSerialization(final String sessionId, final Serializable page) - { - if (dataStore.isReplicated()) - { - return null; - } - - SerializedPage result = null; - - if (page instanceof IManageablePage) - { - IManageablePage _page = (IManageablePage)page; - result = pagesCache.getPage(sessionId, _page.getPageId()); - if (result == null) - { - result = createSerializedPage(sessionId, _page); - if (result != null) - { - pagesCache.storePage(sessionId, _page.getPageId(), result); - } - } - } - else if (page instanceof SerializedPage) - { - SerializedPage _page = (SerializedPage)page; - if (_page.getData() == null) - { - result = restoreStrippedSerializedPage(_page); - } - else - { - result = _page; - } - } - - if (result != null) - { - return result; - } - return page; - } - - /** - * - * @return Always true for this implementation - */ - protected boolean storeAfterSessionReplication() - { - return true; - } - - @Override - public Object restoreAfterSerialization(final Serializable serializable) - { - if (serializable == null) - { - return null; - } - else if (!storeAfterSessionReplication() || serializable instanceof IManageablePage) - { - return serializable; - } - else if (serializable instanceof SerializedPage) - { - SerializedPage page = (SerializedPage)serializable; - if (page.getData() != null) - { - storePageData(page.getSessionId(), page.getPageId(), page.getData()); - return new SerializedPage(page.getSessionId(), page.getPageId(), null); - } - return page; - } - - String type = serializable.getClass().getName(); - throw new IllegalArgumentException("Unknown object type " + type); - } - - /** - * A representation of {@link IManageablePage} that knows additionally the id of the http - * session in which this {@link IManageablePage} instance is used. The {@link #sessionId} and - * {@link #pageId} are used for better clustering in the {@link IDataStore} structures. - */ - protected static class SerializedPage implements Serializable - { - private static final long serialVersionUID = 1L; - - /** - * The id of the serialized {@link IManageablePage} - */ - private final int pageId; - - /** - * The id of the http session in which the serialized {@link IManageablePage} is used. - */ - private final String sessionId; - - /** - * The serialized {@link IManageablePage} - */ - private final byte[] data; - - public SerializedPage(String sessionId, int pageId, byte[] data) - { - this.pageId = pageId; - this.sessionId = sessionId; - this.data = data; - } - - public byte[] getData() - { - return data; - } - - public int getPageId() - { - return pageId; - } - - public String getSessionId() - { - return sessionId; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - { - return true; - } - if ((obj instanceof SerializedPage) == false) - { - return false; - } - SerializedPage rhs = (SerializedPage)obj; - return Objects.equal(getPageId(), rhs.getPageId()) && - Objects.equal(getSessionId(), rhs.getSessionId()); - } - - @Override - public int hashCode() - { - return Objects.hashCode(getPageId(), getSessionId()); - } - } - - /** - * - * @param sessionId - * @param page - * @return the serialized page information - */ - protected SerializedPage createSerializedPage(final String sessionId, final IManageablePage page) - { - Args.notNull(sessionId, "sessionId"); - Args.notNull(page, "page"); - - SerializedPage serializedPage = null; - - byte[] data = serializePage(page); - - if (data != null) - { - serializedPage = new SerializedPage(sessionId, page.getPageId(), data); - } - else if (LOG.isWarnEnabled()) - { - LOG.warn("Page {} cannot be serialized. See previous logs for possible reasons.", page); - } - return serializedPage; - } - - /** - * Cache that stores serialized pages. This is important to make sure that a single page is not - * serialized twice or more when not necessary. - * <p> - * For example a page is serialized during request, but it might be also later serialized on - * session replication. The purpose of this cache is to make sure that the data obtained from - * first serialization is reused on second serialization. - * - * @author Matej Knopp - */ - static class SerializedPagesCache implements SecondLevelPageCache<String, Integer, SerializedPage> - { - private final int maxSize; - - private final ConcurrentLinkedDeque<SoftReference<SerializedPage>> cache; - - /** - * Constructor. - * - * @param maxSize - * The maximum number of entries to cache - */ - public SerializedPagesCache(final int maxSize) - { - this.maxSize = maxSize; - cache = new ConcurrentLinkedDeque<>(); - } - - /** - * - * @param sessionId - * @param pageId - * @return the removed {@link SerializedPage} or <code>null</code> - otherwise - */ - @Override - public SerializedPage removePage(final String sessionId, final Integer pageId) - { - if (maxSize > 0) - { - Args.notNull(sessionId, "sessionId"); - Args.notNull(pageId, "pageId"); - - SerializedPage sample = new SerializedPage(sessionId, pageId, null); - - for (Iterator<SoftReference<SerializedPage>> i = cache.iterator(); i.hasNext();) - { - SoftReference<SerializedPage> ref = i.next(); - SerializedPage entry = ref.get(); - if (sample.equals(entry)) - { - i.remove(); - return entry; - } - } - } - return null; - } - - /** - * Removes all {@link SerializedPage}s for the session with <code>sessionId</code> from the - * cache. - * - * @param sessionId - */ - @Override - public void removePages(String sessionId) - { - if (maxSize > 0) - { - Args.notNull(sessionId, "sessionId"); - - for (Iterator<SoftReference<SerializedPage>> i = cache.iterator(); i.hasNext();) - { - SoftReference<SerializedPage> ref = i.next(); - SerializedPage entry = ref.get(); - if (entry != null && entry.getSessionId().equals(sessionId)) - { - i.remove(); - } - } - } - } - - /** - * Returns a {@link SerializedPage} by looking it up by <code>sessionId</code> and - * <code>pageId</code>. If there is a match then it is <i>touched</i>, i.e. it is moved at - * the top of the cache. - * - * @param sessionId - * @param pageId - * @return the found serialized page or <code>null</code> when not found - */ - @Override - public SerializedPage getPage(String sessionId, Integer pageId) - { - SerializedPage result = null; - if (maxSize > 0) - { - Args.notNull(sessionId, "sessionId"); - Args.notNull(pageId, "pageId"); - - SerializedPage sample = new SerializedPage(sessionId, pageId, null); - - for (Iterator<SoftReference<SerializedPage>> i = cache.iterator(); i.hasNext();) - { - SoftReference<SerializedPage> ref = i.next(); - SerializedPage entry = ref.get(); - if (sample.equals(entry)) - { - i.remove(); - result = entry; - break; - } - } - - if (result != null) - { - // move to top - internalStore(result); - } - } - return result; - } - - /** - * Store the serialized page in cache - * - * @param page - * the data to serialize (page id, session id, bytes) - */ - @Override - public void storePage(String sessionId, Integer pageId, SerializedPage page) - { - if (maxSize > 0) - { - Args.notNull(sessionId, "sessionId"); - Args.notNull(pageId, "pageId"); - Args.notNull(page, "page"); - - for (Iterator<SoftReference<SerializedPage>> i = cache.iterator(); i.hasNext();) - { - SoftReference<SerializedPage> r = i.next(); - SerializedPage entry = r.get(); - if (entry != null && entry.equals(page)) - { - i.remove(); - break; - } - } - - internalStore(page); - } - } - - private void internalStore(SerializedPage page) - { - cache.push(new SoftReference<>(page)); - while (cache.size() > maxSize) - { - cache.pollLast(); - } - } - - @Override - public void destroy() - { - cache.clear(); - } - } - - @Override - public boolean canBeAsynchronous() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/DelegatingPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/DelegatingPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/DelegatingPageStore.java new file mode 100644 index 0000000..126509c --- /dev/null +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/DelegatingPageStore.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.pageStore; + +import org.apache.wicket.page.IManageablePage; +import org.apache.wicket.util.lang.Args; + +/** + * An {@link IPageStore} that delegates to another storage. + */ +public abstract class DelegatingPageStore implements IPageStore +{ + private final IPageStore delegate; + + protected DelegatingPageStore(IPageStore delegate) { + this.delegate = Args.notNull(delegate, "delegate"); + } + + public IPageStore getDelegate() + { + return delegate; + } + + @Override + public void addPage(IPageContext context, IManageablePage page) { + delegate.addPage(context, page); + } + + @Override + public void removePage(IPageContext context, IManageablePage page) { + delegate.removePage(context, page); + } + + @Override + public void removeAllPages(IPageContext context) { + delegate.removeAllPages(context); + } + + @Override + public IManageablePage getPage(IPageContext context, int id) { + return delegate.getPage(context, id); + } + + @Override + public void detach(IPageContext context) { + delegate.detach(context); + } + + @Override + public void destroy() { + delegate.destroy(); + } +} http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskDataStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskDataStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskDataStore.java deleted file mode 100644 index 2dd95e4..0000000 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskDataStore.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.wicket.pageStore; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.io.RandomAccessFile; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.wicket.WicketRuntimeException; -import org.apache.wicket.pageStore.PageWindowManager.PageWindow; -import org.apache.wicket.util.file.Files; -import org.apache.wicket.util.io.IOUtils; -import org.apache.wicket.util.lang.Args; -import org.apache.wicket.util.lang.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A data store implementation which stores the data on disk (in a file system) - */ -public class DiskDataStore implements IDataStore -{ - private static final Logger log = LoggerFactory.getLogger(DiskDataStore.class); - - private static final String INDEX_FILE_NAME = "DiskDataStoreIndex"; - - private final String applicationName; - - private final Bytes maxSizePerPageSession; - - private final File fileStoreFolder; - - private final ConcurrentMap<String, SessionEntry> sessionEntryMap; - - /** - * Construct. - * - * @param applicationName - * @param fileStoreFolder - * @param maxSizePerSession - */ - public DiskDataStore(final String applicationName, final File fileStoreFolder, - final Bytes maxSizePerSession) - { - this.applicationName = applicationName; - this.fileStoreFolder = fileStoreFolder; - maxSizePerPageSession = Args.notNull(maxSizePerSession, "maxSizePerSession"); - sessionEntryMap = new ConcurrentHashMap<>(); - - try - { - if (this.fileStoreFolder.exists() || this.fileStoreFolder.mkdirs()) - { - loadIndex(); - } - else - { - log.warn("Cannot create file store folder for some reason."); - } - } - catch (SecurityException e) - { - throw new WicketRuntimeException( - "SecurityException occurred while creating DiskDataStore. Consider using a non-disk based IDataStore implementation. " - + "See org.apache.wicket.Application.setPageManagerProvider(IPageManagerProvider)", - e); - } - } - - /** - * @see org.apache.wicket.pageStore.IDataStore#destroy() - */ - @Override - public void destroy() - { - log.debug("Destroying..."); - saveIndex(); - log.debug("Destroyed."); - } - - /** - * @see org.apache.wicket.pageStore.IDataStore#getData(java.lang.String, int) - */ - @Override - public byte[] getData(final String sessionId, final int id) - { - byte[] pageData = null; - SessionEntry sessionEntry = getSessionEntry(sessionId, false); - if (sessionEntry != null) - { - pageData = sessionEntry.loadPage(id); - } - - if (log.isDebugEnabled()) - { - log.debug("Returning data{} for page with id '{}' in session with id '{}'", - pageData != null ? "" : "(null)", id, sessionId); - } - return pageData; - } - - /** - * @see org.apache.wicket.pageStore.IDataStore#isReplicated() - */ - @Override - public boolean isReplicated() - { - return false; - } - - /** - * @see org.apache.wicket.pageStore.IDataStore#removeData(java.lang.String, int) - */ - @Override - public void removeData(final String sessionId, final int id) - { - SessionEntry sessionEntry = getSessionEntry(sessionId, false); - if (sessionEntry != null) - { - if (log.isDebugEnabled()) - { - log.debug("Removing data for page with id '{}' in session with id '{}'", id, sessionId); - } - sessionEntry.removePage(id); - } - } - - /** - * @see org.apache.wicket.pageStore.IDataStore#removeData(java.lang.String) - */ - @Override - public void removeData(final String sessionId) - { - SessionEntry sessionEntry = getSessionEntry(sessionId, false); - if (sessionEntry != null) - { - log.debug("Removing data for pages in session with id '{}'", sessionId); - synchronized (sessionEntry) - { - sessionEntryMap.remove(sessionEntry.sessionId); - sessionEntry.unbind(); - } - } - } - - /** - * @see org.apache.wicket.pageStore.IDataStore#storeData(java.lang.String, int, byte[]) - */ - @Override - public void storeData(final String sessionId, final int id, final byte[] data) - { - SessionEntry sessionEntry = getSessionEntry(sessionId, true); - if (sessionEntry != null) - { - log.debug("Storing data for page with id '{}' in session with id '{}'", id, sessionId); - sessionEntry.savePage(id, data); - } - } - - /** - * - * @param sessionId - * @param create - * @return the session entry - */ - protected SessionEntry getSessionEntry(final String sessionId, final boolean create) - { - if (!create) - { - return sessionEntryMap.get(sessionId); - } - - SessionEntry entry = new SessionEntry(this, sessionId); - SessionEntry existing = sessionEntryMap.putIfAbsent(sessionId, entry); - return existing != null ? existing : entry; - } - - /** - * Load the index - */ - @SuppressWarnings("unchecked") - private void loadIndex() - { - File storeFolder = getStoreFolder(); - File index = new File(storeFolder, INDEX_FILE_NAME); - if (index.exists() && index.length() > 0) - { - try - { - InputStream stream = new FileInputStream(index); - ObjectInputStream ois = new ObjectInputStream(stream); - try - { - Map<String, SessionEntry> map = (Map<String, SessionEntry>)ois.readObject(); - sessionEntryMap.clear(); - sessionEntryMap.putAll(map); - - for (Entry<String, SessionEntry> entry : sessionEntryMap.entrySet()) - { - // initialize the diskPageStore reference - SessionEntry sessionEntry = entry.getValue(); - sessionEntry.diskDataStore = this; - } - } finally { - stream.close(); - ois.close(); - } - } - catch (Exception e) - { - log.error("Couldn't load DiskDataStore index from file " + index + ".", e); - } - } - Files.remove(index); - } - - /** - * - */ - private void saveIndex() - { - File storeFolder = getStoreFolder(); - if (storeFolder.exists()) - { - File index = new File(storeFolder, INDEX_FILE_NAME); - Files.remove(index); - try - { - OutputStream stream = new FileOutputStream(index); - ObjectOutputStream oos = new ObjectOutputStream(stream); - try - { - Map<String, SessionEntry> map = new HashMap<>(sessionEntryMap.size()); - for (Entry<String, SessionEntry> e : sessionEntryMap.entrySet()) - { - if (e.getValue().unbound == false) - { - map.put(e.getKey(), e.getValue()); - } - } - oos.writeObject(map); - } finally { - stream.close(); - oos.close(); - } - } - catch (Exception e) - { - log.error("Couldn't write DiskDataStore index to file " + index + ".", e); - } - } - } - - /** - * - */ - protected static class SessionEntry implements Serializable - { - private static final long serialVersionUID = 1L; - - private final String sessionId; - private transient DiskDataStore diskDataStore; - private String fileName; - private PageWindowManager manager; - private boolean unbound = false; - - protected SessionEntry(DiskDataStore diskDataStore, String sessionId) - { - this.diskDataStore = diskDataStore; - this.sessionId = sessionId; - } - - public PageWindowManager getManager() - { - if (manager == null) - { - manager = new PageWindowManager(diskDataStore.maxSizePerPageSession.bytes()); - } - return manager; - } - - private String getFileName() - { - if (fileName == null) - { - fileName = diskDataStore.getSessionFileName(sessionId, true); - } - return fileName; - } - - /** - * @return session id - */ - public String getSessionId() - { - return sessionId; - } - - /** - * Saves the serialized page to appropriate file. - * - * @param pageId - * @param data - */ - public synchronized void savePage(int pageId, byte data[]) - { - if (unbound) - { - return; - } - // only save page that has some data - if (data != null) - { - // allocate window for page - PageWindow window = getManager().createPageWindow(pageId, data.length); - - FileChannel channel = getFileChannel(true); - if (channel != null) - { - try - { - // write the content - channel.write(ByteBuffer.wrap(data), window.getFilePartOffset()); - } - catch (IOException e) - { - log.error("Error writing to a channel " + channel, e); - } - finally - { - IOUtils.closeQuietly(channel); - } - } - else - { - log.warn( - "Cannot save page with id '{}' because the data file cannot be opened.", - pageId); - } - } - } - - /** - * Removes the page from pagemap file. - * - * @param pageId - */ - public synchronized void removePage(int pageId) - { - if (unbound) - { - return; - } - getManager().removePage(pageId); - } - - /** - * Loads the part of pagemap file specified by the given PageWindow. - * - * @param window - * @return serialized page data - */ - public byte[] loadPage(PageWindow window) - { - byte[] result = null; - FileChannel channel = getFileChannel(false); - if (channel != null) - { - ByteBuffer buffer = ByteBuffer.allocate(window.getFilePartSize()); - try - { - channel.read(buffer, window.getFilePartOffset()); - if (buffer.hasArray()) - { - result = buffer.array(); - } - } - catch (IOException e) - { - log.error("Error reading from file channel " + channel, e); - } - finally - { - IOUtils.closeQuietly(channel); - } - } - return result; - } - - private FileChannel getFileChannel(boolean create) - { - FileChannel channel = null; - File file = new File(getFileName()); - if (create || file.exists()) - { - String mode = create ? "rw" : "r"; - try - { - RandomAccessFile randomAccessFile = new RandomAccessFile(file, mode); - channel = randomAccessFile.getChannel(); - } - catch (FileNotFoundException fnfx) - { - // can happen if the file is locked. WICKET-4176 - log.error(fnfx.getMessage(), fnfx); - } - } - return channel; - } - - /** - * Loads the specified page data. - * - * @param id - * @return page data or null if the page is no longer in pagemap file - */ - public synchronized byte[] loadPage(int id) - { - if (unbound) - { - return null; - } - byte[] result = null; - PageWindow window = getManager().getPageWindow(id); - if (window != null) - { - result = loadPage(window); - } - return result; - } - - /** - * Deletes all files for this session. - */ - public synchronized void unbind() - { - File sessionFolder = diskDataStore.getSessionFolder(sessionId, false); - if (sessionFolder.exists()) - { - Files.removeFolder(sessionFolder); - cleanup(sessionFolder); - } - unbound = true; - } - - /** - * deletes the sessionFolder's parent and grandparent, if (and only if) they are empty. - * - * @see #createPathFrom(String sessionId) - * @param sessionFolder - * must not be null - */ - private void cleanup(final File sessionFolder) - { - File high = sessionFolder.getParentFile(); - if (high != null && high.list().length == 0) - { - if (Files.removeFolder(high)) - { - File low = high.getParentFile(); - if (low != null && low.list().length == 0) - { - Files.removeFolder(low); - } - } - } - } - } - - /** - * Returns the file name for specified session. If the session folder (folder that contains the - * file) does not exist and createSessionFolder is true, the folder will be created. - * - * @param sessionId - * @param createSessionFolder - * @return file name for pagemap - */ - private String getSessionFileName(String sessionId, boolean createSessionFolder) - { - File sessionFolder = getSessionFolder(sessionId, createSessionFolder); - return new File(sessionFolder, "data").getAbsolutePath(); - } - - /** - * This folder contains sub-folders named as the session id for which they hold the data. - * - * @return the folder where the pages are stored - */ - protected File getStoreFolder() - { - return new File(fileStoreFolder, applicationName + "-filestore"); - } - - /** - * Returns the folder for the specified sessions. If the folder doesn't exist and the create - * flag is set, the folder will be created. - * - * @param sessionId - * @param create - * @return folder used to store session data - */ - protected File getSessionFolder(String sessionId, final boolean create) - { - File storeFolder = getStoreFolder(); - - sessionId = sessionId.replace('*', '_'); - sessionId = sessionId.replace('/', '_'); - sessionId = sessionId.replace(':', '_'); - - sessionId = createPathFrom(sessionId); - - File sessionFolder = new File(storeFolder, sessionId); - if (create && sessionFolder.exists() == false) - { - Files.mkdirs(sessionFolder); - } - return sessionFolder; - } - - /** - * creates a three-level path from the sessionId in the format 0000/0000/<sessionId>. The two - * prefixing directories are created from the sessionId's hashcode and thus, should be well - * distributed. - * - * This is used to avoid problems with Filesystems allowing no more than 32k entries in a - * directory. - * - * Note that the prefix paths are created from Integers and not guaranteed to be four chars - * long. - * - * @param sessionId - * must not be null - * @return path in the form 0000/0000/sessionId - */ - private String createPathFrom(final String sessionId) - { - int sessionIdHashCode = sessionId.hashCode(); - if (sessionIdHashCode == Integer.MIN_VALUE) { - // Math.abs(MIN_VALUE) == MIN_VALUE, so avoid it - sessionIdHashCode += 1; - } - int hash = Math.abs(sessionIdHashCode); - String low = String.valueOf(hash % 9973); - String high = String.valueOf((hash / 9973) % 9973); - StringBuilder bs = new StringBuilder(sessionId.length() + 10); - bs.append(low); - bs.append(File.separator); - bs.append(high); - bs.append(File.separator); - bs.append(sessionId); - - return bs.toString(); - } - - @Override - public boolean canBeAsynchronous() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/wicket/blob/c43d3a33/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskPageStore.java new file mode 100644 index 0000000..d3726f0 --- /dev/null +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/DiskPageStore.java @@ -0,0 +1,710 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.pageStore; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.servlet.http.HttpSessionBindingEvent; +import javax.servlet.http.HttpSessionBindingListener; + +import org.apache.wicket.Session; +import org.apache.wicket.WicketRuntimeException; +import org.apache.wicket.page.IManageablePage; +import org.apache.wicket.pageStore.disk.NestedFolders; +import org.apache.wicket.pageStore.disk.PageWindowManager; +import org.apache.wicket.pageStore.disk.PageWindowManager.FileWindow; +import org.apache.wicket.protocol.http.PageExpiredException; +import org.apache.wicket.serialize.ISerializer; +import org.apache.wicket.util.file.Files; +import org.apache.wicket.util.io.IOUtils; +import org.apache.wicket.util.lang.Args; +import org.apache.wicket.util.lang.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A storage of pages on disk. + * <p> + * Implementation note: {@link DiskPageStore} writes pages into a single file, appending new pages while overwriting the oldest pages. + * Since Ajax requests do not change the id of a page, {@link DiskPageStore} offers an optimization to overwrite the most recently written + * page, if it has the same id as a new page to write.<p> + * However this does not help in case of alternating requests between multiple browser windows: In this case requests are processed for + * different page ids and the oldests pages are constantly overwritten (this can easily happen with Ajax timers on one or more pages). + * This leads to pages with identical id superfluously kept in the file, while older pages are prematurely expelled. + * Any following request to these older pages will then fail {@link PageExpiredException}. + */ +public class DiskPageStore implements IPersistentPageStore +{ + private static final Logger log = LoggerFactory.getLogger(DiskPageStore.class); + + private static final String KEY = "wicket:DiskPageStore"; + + private static final String INDEX_FILE_NAME = "DiskPageStoreIndex"; + + /** + * A cache that holds all page stores. + */ + private static final ConcurrentMap<String, DiskPageStore> DISK_STORES = new ConcurrentHashMap<>(); + + private final String applicationName; + + private final ISerializer serializer; + + private final Bytes maxSizePerSession; + + private final NestedFolders folders; + + private final ConcurrentMap<String, DiskData> diskDatas; + + /** + * Create a store that supports {@link SerializedPage}s only. + * + * @param applicationName + * name of application + * @param fileStoreFolder + * folder to store to + * @param maxSizePerSession + * maximum size per session + * + * @see SerializingPageStore + */ + public DiskPageStore(String applicationName, File fileStoreFolder, Bytes maxSizePerSession) + { + this(applicationName, fileStoreFolder, maxSizePerSession, null); + } + + /** + * Create a store to disk. + * + * @param applicationName + * name of application + * @param fileStoreFolder + * folder to store to + * @param maxSizePerSession + * maximum size per session + * @param serializer + * for serialization of pages + */ + public DiskPageStore(String applicationName, File fileStoreFolder, Bytes maxSizePerSession, + ISerializer serializer) + { + this.applicationName = Args.notNull(applicationName, "applicationName"); + this.folders = new NestedFolders(new File(fileStoreFolder, applicationName + "-filestore")); + this.maxSizePerSession = Args.notNull(maxSizePerSession, "maxSizePerSession"); + this.serializer = serializer; // optional + + this.diskDatas = new ConcurrentHashMap<>(); + + try + { + if (folders.getBase().exists() || folders.getBase().mkdirs()) + { + loadIndex(); + } + else + { + log.warn("Cannot create file store folder for some reason."); + } + } + catch (SecurityException e) + { + throw new WicketRuntimeException( + "SecurityException occurred while creating DiskPageStore. Consider using a non-disk based IPageStore implementation. " + + "See org.apache.wicket.Application.setPageManagerProvider(IPageManagerProvider)", + e); + } + + if (DISK_STORES.containsKey(applicationName)) + { + throw new IllegalStateException( + "Store for application with key '" + applicationName + "' already exists."); + } + DISK_STORES.put(applicationName, this); + } + + @Override + public void destroy() + { + log.debug("Destroying..."); + saveIndex(); + + DISK_STORES.remove(applicationName); + + log.debug("Destroyed."); + } + + @Override + public boolean canBeAsynchronous(IPageContext context) + { + // session attribute must be added here *before* any asynchronous calls + // when session is no longer available + getSessionAttribute(context, true); + + return true; + } + + @Override + public IManageablePage getPage(IPageContext context, int id) + { + IManageablePage page = null; + + DiskData diskData = getDiskData(context, false); + if (diskData != null) + { + byte[] data = diskData.loadPage(id); + if (data != null) + { + if (serializer == null) + { + page = new SerializedPage(id, "unknown", data); + } + else + { + page = (IManageablePage)serializer.deserialize(data); + } + + if (log.isDebugEnabled()) + { + log.debug("Returning page with id '{}' in session with id '{}'", id, context.getSessionId()); + } + } + } + + return page; + } + + @Override + public void removePage(IPageContext context, IManageablePage page) + { + DiskData diskData = getDiskData(context, false); + if (diskData != null) + { + if (log.isDebugEnabled()) + { + log.debug("Removing page with id '{}' in session with id '{}'", page.getPageId(), + context.getSessionId()); + } + diskData.removeData(page.getPageId()); + } + } + + @Override + public void removeAllPages(IPageContext context) + { + DiskData diskData = getDiskData(context, false); + if (diskData != null) + { + removeDiskData(diskData); + } + } + + protected void removeDiskData(DiskData diskData) + { + synchronized (diskDatas) + { + diskDatas.remove(diskData.sessionIdentifier); + diskData.unbind(); + } + } + + /** + * Supports {@link SerializedPage}s too - for this to work the delegating {@link IPageStore} + * must use the same {@link ISerializer} as this one. + */ + @Override + public void addPage(IPageContext context, IManageablePage page) + { + DiskData diskData = getDiskData(context, true); + if (diskData != null) + { + log.debug("Storing data for page with id '{}' in session with id '{}'", + page.getPageId(), context.getSessionId()); + + byte[] data; + String type; + if (page instanceof SerializedPage) + { + data = ((SerializedPage)page).getData(); + type = ((SerializedPage)page).getPageType(); + } + else + { + if (serializer == null) + { + throw new WicketRuntimeException( + "DiskPageStore not configured for serialization"); + } + data = serializer.serialize(page); + type = page.getClass().getName(); + } + + diskData.savePage(page.getPageId(), type, data); + } + } + + /** + * + * @param context + * @param create + * @return the session entry + */ + protected DiskData getDiskData(final IPageContext context, final boolean create) + { + SessionAttribute attribute = getSessionAttribute(context, create); + + if (!create && attribute == null) + { + return null; + } + + return getDiskData(attribute.identifier, create); + } + + protected DiskData getDiskData(String sessionIdentifier, boolean create) + { + if (!create) + { + return diskDatas.get(sessionIdentifier); + } + + DiskData data = new DiskData(this, sessionIdentifier); + DiskData existing = diskDatas.putIfAbsent(sessionIdentifier, data); + return existing != null ? existing : data; + } + + protected SessionAttribute getSessionAttribute(IPageContext context, boolean create) + { + context.bind(); + + SessionAttribute attribute = context.getSessionAttribute(KEY); + if (attribute == null && create) + { + attribute = new SessionAttribute(applicationName, context.getSessionId()); + context.setSessionAttribute(KEY, attribute); + } + return attribute; + } + + /** + * Load the index + */ + @SuppressWarnings("unchecked") + private void loadIndex() + { + File storeFolder = folders.getBase(); + + File index = new File(storeFolder, INDEX_FILE_NAME); + if (index.exists() && index.length() > 0) + { + try + { + InputStream stream = new FileInputStream(index); + ObjectInputStream ois = new ObjectInputStream(stream); + try + { + diskDatas.clear(); + + for (DiskData diskData : (List<DiskData>)ois.readObject()) + { + diskData.pageStore = this; + diskDatas.put(diskData.sessionIdentifier, diskData); + } + } + finally + { + stream.close(); + ois.close(); + } + } + catch (Exception e) + { + log.error("Couldn't load DiskPageStore index from file " + index + ".", e); + } + } + Files.remove(index); + } + + /** + * + */ + private void saveIndex() + { + File storeFolder = folders.getBase(); + if (storeFolder.exists()) + { + File index = new File(storeFolder, INDEX_FILE_NAME); + Files.remove(index); + try + { + OutputStream stream = new FileOutputStream(index); + ObjectOutputStream oos = new ObjectOutputStream(stream); + try + { + List<DiskData> list = new ArrayList<>(diskDatas.size()); + for (DiskData diskData : diskDatas.values()) + { + if (diskData.sessionIdentifier != null) + { + list.add(diskData); + } + } + oos.writeObject(list); + } + finally + { + stream.close(); + oos.close(); + } + } + catch (Exception e) + { + log.error("Couldn't write DiskPageStore index to file " + index + ".", e); + } + } + } + + @Override + public Set<String> getContextIdentifiers() + { + return Collections.unmodifiableSet(diskDatas.keySet()); + } + + /** + * + * @param session + * key + * @return a list of the last N page windows + */ + @Override + public List<IPersistedPage> getPersistentPages(String sessionIdentifier) + { + List<IPersistedPage> pages = new ArrayList<>(); + + DiskData diskData = getDiskData(sessionIdentifier, false); + if (diskData != null) + { + PageWindowManager windowManager = diskData.getManager(); + + pages.addAll(windowManager.getFileWindows()); + } + return pages; + } + + @Override + public String getContextIdentifier(IPageContext context) + { + SessionAttribute sessionAttribute = getSessionAttribute(context, true); + + return sessionAttribute.identifier; + } + + @Override + public Bytes getTotalSize() + { + long size = 0; + + synchronized (diskDatas) + { + for (DiskData diskData : diskDatas.values()) + { + size = size + diskData.size(); + } + } + + return Bytes.bytes(size); + } + + /** + * Data held on disk. + */ + protected static class DiskData implements Serializable + { + private static final long serialVersionUID = 1L; + + private transient DiskPageStore pageStore; + + private transient String fileName; + + private String sessionIdentifier; + + private PageWindowManager manager; + + protected DiskData(DiskPageStore pageStore, String sessionIdentifier) + { + this.pageStore = pageStore; + + this.sessionIdentifier = sessionIdentifier; + } + + public long size() + { + return manager.getTotalSize(); + } + + public PageWindowManager getManager() + { + if (manager == null) + { + manager = new PageWindowManager(pageStore.maxSizePerSession.bytes()); + } + return manager; + } + + private String getFileName() + { + if (fileName == null) + { + fileName = pageStore.getSessionFileName(sessionIdentifier, true); + } + return fileName; + } + + /** + * @return session id + */ + public String getKey() + { + return sessionIdentifier; + } + + /** + * Saves the serialized page to appropriate file. + * + * @param pageId + * @param pageType + * @param data + */ + public synchronized void savePage(int pageId, String pageType, byte data[]) + { + if (sessionIdentifier == null) + { + return; + } + + // only save page that has some data + if (data != null) + { + // allocate window for page + FileWindow window = getManager().createPageWindow(pageId, pageType, data.length); + + FileChannel channel = getFileChannel(true); + if (channel != null) + { + try + { + // write the content + channel.write(ByteBuffer.wrap(data), window.getFilePartOffset()); + } + catch (IOException e) + { + log.error("Error writing to a channel " + channel, e); + } + finally + { + IOUtils.closeQuietly(channel); + } + } + else + { + log.warn( + "Cannot save page with id '{}' because the data file cannot be opened.", + pageId); + } + } + } + + /** + * Removes the page from pagemap file. + * + * @param pageId + */ + public synchronized void removeData(int pageId) + { + if (sessionIdentifier == null) + { + return; + } + + getManager().removePage(pageId); + } + + /** + * Loads the part of pagemap file specified by the given PageWindow. + * + * @param window + * @return serialized page data + */ + public byte[] loadData(FileWindow window) + { + byte[] result = null; + FileChannel channel = getFileChannel(false); + if (channel != null) + { + ByteBuffer buffer = ByteBuffer.allocate(window.getFilePartSize()); + try + { + channel.read(buffer, window.getFilePartOffset()); + if (buffer.hasArray()) + { + result = buffer.array(); + } + } + catch (IOException e) + { + log.error("Error reading from file channel " + channel, e); + } + finally + { + IOUtils.closeQuietly(channel); + } + } + return result; + } + + private FileChannel getFileChannel(boolean create) + { + FileChannel channel = null; + File file = new File(getFileName()); + if (create || file.exists()) + { + String mode = create ? "rw" : "r"; + try + { + RandomAccessFile randomAccessFile = new RandomAccessFile(file, mode); + channel = randomAccessFile.getChannel(); + } + catch (FileNotFoundException fnfx) + { + // can happen if the file is locked. WICKET-4176 + log.error(fnfx.getMessage(), fnfx); + } + } + return channel; + } + + /** + * Loads the specified page data. + * + * @param id + * @return page data or null if the page is no longer in pagemap file + */ + public synchronized byte[] loadPage(int id) + { + if (sessionIdentifier == null) + { + return null; + } + + FileWindow window = getManager().getPageWindow(id); + if (window == null) + { + return null; + } + + return loadData(window); + } + + /** + * Deletes all files for this session. + */ + public synchronized void unbind() + { + pageStore.folders.remove(sessionIdentifier); + + sessionIdentifier = null; + } + } + + /** + * Returns the file name for specified session. If the session folder (folder that contains the + * file) does not exist and createSessionFolder is true, the folder will be created. + * + * @param sessionIdentifier + * @param createSessionFolder + * @return file name for pagemap + */ + private String getSessionFileName(String sessionIdentifier, boolean createSessionFolder) + { + File sessionFolder = folders.get(sessionIdentifier, createSessionFolder); + return new File(sessionFolder, "data").getAbsolutePath(); + } + + /** + * Attribute held in session. + */ + static class SessionAttribute implements Serializable, HttpSessionBindingListener + { + + private final String applicationName; + + /** + * The identifier of the session, must not be equal to {@link Session#getId()}, e.g. when + * the container changes the id after authorization. + */ + public final String identifier; + + public SessionAttribute(String applicationName, String sessionIdentifier) + { + this.applicationName = Args.notNull(applicationName, "applicationName"); + this.identifier = Args.notNull(sessionIdentifier, "sessionIdentifier"); + } + + + @Override + public void valueBound(HttpSessionBindingEvent event) + { + } + + @Override + public void valueUnbound(HttpSessionBindingEvent event) + { + DiskPageStore store = DISK_STORES.get(applicationName); + if (store == null) + { + log.warn( + "Cannot remove data '{}' because disk store for application '{}' is no longer present.", + identifier, applicationName); + } + else + { + DiskData diskData = store.getDiskData(identifier, false); + if (diskData != null) + { + store.removeDiskData(diskData); + } + } + } + } +} \ No newline at end of file
