Repository: wicket Updated Branches: refs/heads/wicket-8.x be57b4889 -> fc8a5ab4d
[WICKET-6603] Asypc page/data store destroyed without hanging Project: http://git-wip-us.apache.org/repos/asf/wicket/repo Commit: http://git-wip-us.apache.org/repos/asf/wicket/commit/1a558850 Tree: http://git-wip-us.apache.org/repos/asf/wicket/tree/1a558850 Diff: http://git-wip-us.apache.org/repos/asf/wicket/diff/1a558850 Branch: refs/heads/wicket-8.x Commit: 1a558850ff4cd4bb56c4f32a613f116dde7ea617 Parents: be57b48 Author: Maxim Solodovnik <[email protected]> Authored: Wed Oct 24 14:51:26 2018 +0700 Committer: Maxim Solodovnik <[email protected]> Committed: Wed Oct 24 14:51:26 2018 +0700 ---------------------------------------------------------------------- .../wicket/pageStore/AsynchronousDataStore.java | 57 +++++++++---------- .../wicket/pageStore/AsynchronousPageStore.java | 58 +++++++++----------- 2 files changed, 50 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/wicket/blob/1a558850/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java index 7ccd689..1c25ae6 100644 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.wicket.util.lang.Args; import org.slf4j.Logger; @@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory; * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page * storing asynchronous. We don't want to write concurrently in the wrapped {@link IDataStore}, * though it may happen in the extreme case when the queue is full. These cases should be avoided. - * + * * @author Matej Knopp */ public class AsynchronousDataStore implements IDataStore @@ -77,9 +78,11 @@ public class AsynchronousDataStore implements IDataStore */ private final ConcurrentMap<String, Entry> entryMap; + private AtomicBoolean operates = new AtomicBoolean(true); + /** * Construct. - * + * * @param dataStore * the wrapped {@link IDataStore} that actually saved the data * @param capacity @@ -91,8 +94,7 @@ public class AsynchronousDataStore implements IDataStore entries = new LinkedBlockingQueue<>(capacity); entryMap = new ConcurrentHashMap<>(); - PageSavingRunnable savingRunnable = new PageSavingRunnable(dataStore, entries, entryMap); - pageSavingThread = new Thread(savingRunnable, "Wicket-AsyncDataStore-PageSavingThread"); + pageSavingThread = new Thread(new PageSavingRunnable(), "Wicket-AsyncDataStore-PageSavingThread"); pageSavingThread.setDaemon(true); pageSavingThread.start(); } @@ -100,9 +102,9 @@ public class AsynchronousDataStore implements IDataStore @Override public void destroy() { + operates.compareAndSet(true, false); if (pageSavingThread.isAlive()) { - pageSavingThread.interrupt(); try { pageSavingThread.join(); @@ -117,7 +119,7 @@ public class AsynchronousDataStore implements IDataStore /** * Little helper - * + * * @param sessionId * @param id * @return Entry @@ -192,12 +194,16 @@ public class AsynchronousDataStore implements IDataStore /** * Save the entry in the queue if there is a room or directly pass it to the wrapped * {@link IDataStore} if there is no such - * + * * @see org.apache.wicket.pageStore.IDataStore#storeData(java.lang.String, int, byte[]) */ @Override public void storeData(final String sessionId, final int id, final byte[] data) { + if (!operates.get()) + { + return; + } Entry entry = new Entry(sessionId, id, data); String key = getKey(entry); entryMap.put(key, entry); @@ -216,13 +222,16 @@ public class AsynchronousDataStore implements IDataStore catch (InterruptedException e) { log.error(e.getMessage(), e); - entryMap.remove(key); - dataStore.storeData(sessionId, id, data); + if (operates.get()) + { + entryMap.remove(key); + dataStore.storeData(sessionId, id, data); + } } } /** - * + * * @param pageId * @param sessionId * @return generated key @@ -233,7 +242,7 @@ public class AsynchronousDataStore implements IDataStore } /** - * + * * @param entry * @return generated key */ @@ -301,28 +310,12 @@ public class AsynchronousDataStore implements IDataStore /** * The thread that acts as consumer of {@link Entry}ies */ - private static class PageSavingRunnable implements Runnable + private class PageSavingRunnable implements Runnable { - private static final Logger log = LoggerFactory.getLogger(PageSavingRunnable.class); - - private final BlockingQueue<Entry> entries; - - private final ConcurrentMap<String, Entry> entryMap; - - private final IDataStore dataStore; - - private PageSavingRunnable(IDataStore dataStore, BlockingQueue<Entry> entries, - ConcurrentMap<String, Entry> entryMap) - { - this.dataStore = dataStore; - this.entries = entries; - this.entryMap = entryMap; - } - @Override public void run() { - while (!Thread.interrupted()) + while (operates.get()) { Entry entry = null; try @@ -331,12 +324,12 @@ public class AsynchronousDataStore implements IDataStore } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + log.debug("PageSavingRunnable:: Interrupted..."); } - if (entry != null) + if (entry != null && operates.get()) { - log.debug("Saving asynchronously: {}...", entry); + log.debug("PageSavingRunnable:: Saving asynchronously: {}...", entry); dataStore.storeData(entry.sessionId, entry.pageId, entry.data); entryMap.remove(getKey(entry)); } http://git-wip-us.apache.org/repos/asf/wicket/blob/1a558850/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java ---------------------------------------------------------------------- diff --git a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java index 43993ba..dfb4d19 100644 --- a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java +++ b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.wicket.page.IManageablePage; import org.apache.wicket.util.lang.Args; @@ -39,9 +40,9 @@ import org.slf4j.LoggerFactory; * It starts only one instance of {@link PageSavingRunnable} because all we need is to make the page * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore}, * though it may happen in the extreme case when the queue is full. These cases should be avoided. - * + * * Based on AsynchronousDataStore (@author Matej Knopp). - * + * * @author manuelbarzi */ public class AsynchronousPageStore implements IPageStore @@ -81,9 +82,11 @@ public class AsynchronousPageStore implements IPageStore */ private final ConcurrentMap<String, Entry> entryMap; + private AtomicBoolean operates = new AtomicBoolean(true); + /** * Construct. - * + * * @param delegate * the wrapped {@link IPageStore} that actually saved the page * @param capacity @@ -95,15 +98,14 @@ public class AsynchronousPageStore implements IPageStore entries = new LinkedBlockingQueue<>(capacity); entryMap = new ConcurrentHashMap<>(); - PageSavingRunnable savingRunnable = new PageSavingRunnable(delegate, entries, entryMap); - pageSavingThread = new Thread(savingRunnable, "Wicket-AsyncPageStore-PageSavingThread"); + pageSavingThread = new Thread(new PageSavingRunnable(), "Wicket-AsyncPageStore-PageSavingThread"); pageSavingThread.setDaemon(true); pageSavingThread.start(); } /** * Little helper - * + * * @param sessionId * @param pageId * @return Entry @@ -114,7 +116,7 @@ public class AsynchronousPageStore implements IPageStore } /** - * + * * @param pageId * @param sessionId * @return generated key @@ -125,7 +127,7 @@ public class AsynchronousPageStore implements IPageStore } /** - * + * * @param entry * @return generated key */ @@ -186,28 +188,12 @@ public class AsynchronousPageStore implements IPageStore /** * The thread that acts as consumer of {@link Entry}ies */ - private static class PageSavingRunnable implements Runnable + private class PageSavingRunnable implements Runnable { - private static final Logger log = LoggerFactory.getLogger(PageSavingRunnable.class); - - private final BlockingQueue<Entry> entries; - - private final ConcurrentMap<String, Entry> entryMap; - - private final IPageStore delegate; - - private PageSavingRunnable(IPageStore delegate, BlockingQueue<Entry> entries, - ConcurrentMap<String, Entry> entryMap) - { - this.delegate = delegate; - this.entries = entries; - this.entryMap = entryMap; - } - @Override public void run() { - while (!Thread.interrupted()) + while (operates.get()) { Entry entry = null; try @@ -216,12 +202,12 @@ public class AsynchronousPageStore implements IPageStore } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + log.debug("PageSavingRunnable:: Interrupted..."); } - if (entry != null) + if (entry != null && operates.get()) { - log.debug("Saving asynchronously: {}...", entry); + log.debug("PageSavingRunnable:: Saving asynchronously: {}...", entry); delegate.storePage(entry.sessionId, entry.page); entryMap.remove(getKey(entry)); } @@ -232,9 +218,9 @@ public class AsynchronousPageStore implements IPageStore @Override public void destroy() { + operates.compareAndSet(true, false); if (pageSavingThread.isAlive()) { - pageSavingThread.interrupt(); try { pageSavingThread.join(); @@ -244,7 +230,6 @@ public class AsynchronousPageStore implements IPageStore log.error(e.getMessage(), e); } } - delegate.destroy(); } @@ -286,6 +271,10 @@ public class AsynchronousPageStore implements IPageStore @Override public void storePage(String sessionId, IManageablePage page) { + if (!operates.get()) + { + return; + } Entry entry = new Entry(sessionId, page); String key = getKey(entry); entryMap.put(key, entry); @@ -308,8 +297,11 @@ public class AsynchronousPageStore implements IPageStore catch (InterruptedException e) { log.error(e.getMessage(), e); - entryMap.remove(key); - delegate.storePage(sessionId, page); + if (operates.get()) + { + entryMap.remove(key); + delegate.storePage(sessionId, page); + } } }
