Repository: wicket
Updated Branches:
  refs/heads/wicket-8.x be57b4889 -> fc8a5ab4d


[WICKET-6603] Asypc page/data store destroyed without hanging


Project: http://git-wip-us.apache.org/repos/asf/wicket/repo
Commit: http://git-wip-us.apache.org/repos/asf/wicket/commit/1a558850
Tree: http://git-wip-us.apache.org/repos/asf/wicket/tree/1a558850
Diff: http://git-wip-us.apache.org/repos/asf/wicket/diff/1a558850

Branch: refs/heads/wicket-8.x
Commit: 1a558850ff4cd4bb56c4f32a613f116dde7ea617
Parents: be57b48
Author: Maxim Solodovnik <[email protected]>
Authored: Wed Oct 24 14:51:26 2018 +0700
Committer: Maxim Solodovnik <[email protected]>
Committed: Wed Oct 24 14:51:26 2018 +0700

----------------------------------------------------------------------
 .../wicket/pageStore/AsynchronousDataStore.java | 57 +++++++++----------
 .../wicket/pageStore/AsynchronousPageStore.java | 58 +++++++++-----------
 2 files changed, 50 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/wicket/blob/1a558850/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
----------------------------------------------------------------------
diff --git 
a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
 
b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
index 7ccd689..1c25ae6 100644
--- 
a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
+++ 
b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.wicket.util.lang.Args;
 import org.slf4j.Logger;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * It starts only one instance of {@link PageSavingRunnable} because all we 
need is to make the page
  * storing asynchronous. We don't want to write concurrently in the wrapped 
{@link IDataStore},
  * though it may happen in the extreme case when the queue is full. These 
cases should be avoided.
- * 
+ *
  * @author Matej Knopp
  */
 public class AsynchronousDataStore implements IDataStore
@@ -77,9 +78,11 @@ public class AsynchronousDataStore implements IDataStore
         */
        private final ConcurrentMap<String, Entry> entryMap;
 
+       private AtomicBoolean operates = new AtomicBoolean(true);
+
        /**
         * Construct.
-        * 
+        *
         * @param dataStore
         *            the wrapped {@link IDataStore} that actually saved the 
data
         * @param capacity
@@ -91,8 +94,7 @@ public class AsynchronousDataStore implements IDataStore
                entries = new LinkedBlockingQueue<>(capacity);
                entryMap = new ConcurrentHashMap<>();
 
-               PageSavingRunnable savingRunnable = new 
PageSavingRunnable(dataStore, entries, entryMap);
-               pageSavingThread = new Thread(savingRunnable, 
"Wicket-AsyncDataStore-PageSavingThread");
+               pageSavingThread = new Thread(new PageSavingRunnable(), 
"Wicket-AsyncDataStore-PageSavingThread");
                pageSavingThread.setDaemon(true);
                pageSavingThread.start();
        }
@@ -100,9 +102,9 @@ public class AsynchronousDataStore implements IDataStore
        @Override
        public void destroy()
        {
+               operates.compareAndSet(true, false);
                if (pageSavingThread.isAlive())
                {
-                       pageSavingThread.interrupt();
                        try
                        {
                                pageSavingThread.join();
@@ -117,7 +119,7 @@ public class AsynchronousDataStore implements IDataStore
 
        /**
         * Little helper
-        * 
+        *
         * @param sessionId
         * @param id
         * @return Entry
@@ -192,12 +194,16 @@ public class AsynchronousDataStore implements IDataStore
        /**
         * Save the entry in the queue if there is a room or directly pass it 
to the wrapped
         * {@link IDataStore} if there is no such
-        * 
+        *
         * @see 
org.apache.wicket.pageStore.IDataStore#storeData(java.lang.String, int, byte[])
         */
        @Override
        public void storeData(final String sessionId, final int id, final 
byte[] data)
        {
+               if (!operates.get())
+               {
+                       return;
+               }
                Entry entry = new Entry(sessionId, id, data);
                String key = getKey(entry);
                entryMap.put(key, entry);
@@ -216,13 +222,16 @@ public class AsynchronousDataStore implements IDataStore
                catch (InterruptedException e)
                {
                        log.error(e.getMessage(), e);
-                       entryMap.remove(key);
-                       dataStore.storeData(sessionId, id, data);
+                       if (operates.get())
+                       {
+                               entryMap.remove(key);
+                               dataStore.storeData(sessionId, id, data);
+                       }
                }
        }
 
        /**
-        * 
+        *
         * @param pageId
         * @param sessionId
         * @return generated key
@@ -233,7 +242,7 @@ public class AsynchronousDataStore implements IDataStore
        }
 
        /**
-        * 
+        *
         * @param entry
         * @return generated key
         */
@@ -301,28 +310,12 @@ public class AsynchronousDataStore implements IDataStore
        /**
         * The thread that acts as consumer of {@link Entry}ies
         */
-       private static class PageSavingRunnable implements Runnable
+       private class PageSavingRunnable implements Runnable
        {
-               private static final Logger log = 
LoggerFactory.getLogger(PageSavingRunnable.class);
-
-               private final BlockingQueue<Entry> entries;
-
-               private final ConcurrentMap<String, Entry> entryMap;
-
-               private final IDataStore dataStore;
-
-               private PageSavingRunnable(IDataStore dataStore, 
BlockingQueue<Entry> entries,
-                       ConcurrentMap<String, Entry> entryMap)
-               {
-                       this.dataStore = dataStore;
-                       this.entries = entries;
-                       this.entryMap = entryMap;
-               }
-
                @Override
                public void run()
                {
-                       while (!Thread.interrupted())
+                       while (operates.get())
                        {
                                Entry entry = null;
                                try
@@ -331,12 +324,12 @@ public class AsynchronousDataStore implements IDataStore
                                }
                                catch (InterruptedException e)
                                {
-                                       Thread.currentThread().interrupt();
+                                       log.debug("PageSavingRunnable:: 
Interrupted...");
                                }
 
-                               if (entry != null)
+                               if (entry != null && operates.get())
                                {
-                                       log.debug("Saving asynchronously: 
{}...", entry);
+                                       log.debug("PageSavingRunnable:: Saving 
asynchronously: {}...", entry);
                                        dataStore.storeData(entry.sessionId, 
entry.pageId, entry.data);
                                        entryMap.remove(getKey(entry));
                                }

http://git-wip-us.apache.org/repos/asf/wicket/blob/1a558850/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java
----------------------------------------------------------------------
diff --git 
a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java
 
b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java
index 43993ba..dfb4d19 100644
--- 
a/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java
+++ 
b/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousPageStore.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.wicket.page.IManageablePage;
 import org.apache.wicket.util.lang.Args;
@@ -39,9 +40,9 @@ import org.slf4j.LoggerFactory;
  * It starts only one instance of {@link PageSavingRunnable} because all we 
need is to make the page
  * storing asynchronous. We don't want to write concurrently in the wrapped 
{@link IPageStore},
  * though it may happen in the extreme case when the queue is full. These 
cases should be avoided.
- * 
+ *
  * Based on AsynchronousDataStore (@author Matej Knopp).
- * 
+ *
  * @author manuelbarzi
  */
 public class AsynchronousPageStore implements IPageStore
@@ -81,9 +82,11 @@ public class AsynchronousPageStore implements IPageStore
         */
        private final ConcurrentMap<String, Entry> entryMap;
 
+       private AtomicBoolean operates = new AtomicBoolean(true);
+
        /**
         * Construct.
-        * 
+        *
         * @param delegate
         *            the wrapped {@link IPageStore} that actually saved the 
page
         * @param capacity
@@ -95,15 +98,14 @@ public class AsynchronousPageStore implements IPageStore
                entries = new LinkedBlockingQueue<>(capacity);
                entryMap = new ConcurrentHashMap<>();
 
-               PageSavingRunnable savingRunnable = new 
PageSavingRunnable(delegate, entries, entryMap);
-               pageSavingThread = new Thread(savingRunnable, 
"Wicket-AsyncPageStore-PageSavingThread");
+               pageSavingThread = new Thread(new PageSavingRunnable(), 
"Wicket-AsyncPageStore-PageSavingThread");
                pageSavingThread.setDaemon(true);
                pageSavingThread.start();
        }
 
        /**
         * Little helper
-        * 
+        *
         * @param sessionId
         * @param pageId
         * @return Entry
@@ -114,7 +116,7 @@ public class AsynchronousPageStore implements IPageStore
        }
 
        /**
-        * 
+        *
         * @param pageId
         * @param sessionId
         * @return generated key
@@ -125,7 +127,7 @@ public class AsynchronousPageStore implements IPageStore
        }
 
        /**
-        * 
+        *
         * @param entry
         * @return generated key
         */
@@ -186,28 +188,12 @@ public class AsynchronousPageStore implements IPageStore
        /**
         * The thread that acts as consumer of {@link Entry}ies
         */
-       private static class PageSavingRunnable implements Runnable
+       private class PageSavingRunnable implements Runnable
        {
-               private static final Logger log = 
LoggerFactory.getLogger(PageSavingRunnable.class);
-
-               private final BlockingQueue<Entry> entries;
-
-               private final ConcurrentMap<String, Entry> entryMap;
-
-               private final IPageStore delegate;
-
-               private PageSavingRunnable(IPageStore delegate, 
BlockingQueue<Entry> entries,
-                                          ConcurrentMap<String, Entry> 
entryMap)
-               {
-                       this.delegate = delegate;
-                       this.entries = entries;
-                       this.entryMap = entryMap;
-               }
-
                @Override
                public void run()
                {
-                       while (!Thread.interrupted())
+                       while (operates.get())
                        {
                                Entry entry = null;
                                try
@@ -216,12 +202,12 @@ public class AsynchronousPageStore implements IPageStore
                                }
                                catch (InterruptedException e)
                                {
-                                       Thread.currentThread().interrupt();
+                                       log.debug("PageSavingRunnable:: 
Interrupted...");
                                }
 
-                               if (entry != null)
+                               if (entry != null && operates.get())
                                {
-                                       log.debug("Saving asynchronously: 
{}...", entry);
+                                       log.debug("PageSavingRunnable:: Saving 
asynchronously: {}...", entry);
                                        delegate.storePage(entry.sessionId, 
entry.page);
                                        entryMap.remove(getKey(entry));
                                }
@@ -232,9 +218,9 @@ public class AsynchronousPageStore implements IPageStore
        @Override
        public void destroy()
        {
+               operates.compareAndSet(true, false);
                if (pageSavingThread.isAlive())
                {
-                       pageSavingThread.interrupt();
                        try
                        {
                                pageSavingThread.join();
@@ -244,7 +230,6 @@ public class AsynchronousPageStore implements IPageStore
                                log.error(e.getMessage(), e);
                        }
                }
-
                delegate.destroy();
        }
 
@@ -286,6 +271,10 @@ public class AsynchronousPageStore implements IPageStore
        @Override
        public void storePage(String sessionId, IManageablePage page)
        {
+               if (!operates.get())
+               {
+                       return;
+               }
                Entry entry = new Entry(sessionId, page);
                String key = getKey(entry);
                entryMap.put(key, entry);
@@ -308,8 +297,11 @@ public class AsynchronousPageStore implements IPageStore
                catch (InterruptedException e)
                {
                        log.error(e.getMessage(), e);
-                       entryMap.remove(key);
-                       delegate.storePage(sessionId, page);
+                       if (operates.get())
+                       {
+                               entryMap.remove(key);
+                               delegate.storePage(sessionId, page);
+                       }
                }
        }
 

Reply via email to