This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag 
org.apache.sling.discovery.commons-1.0.0
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-discovery-commons.git

commit 5a5f5ee4585d1aec441657cad7acbde6f66bab57
Author: Stefan Egli <[email protected]>
AuthorDate: Fri Oct 23 11:28:24 2015 +0000

    SLING-5094 / SLING-5173 / SLING-4603 related : ensure that before invoking 
the ConsistencyService.sync no async events are still in the queue. This is 
achieved by enqueueing an async event too that once it gets triggered ensures 
that no async events are left. This mechanism ensures that before the syncToken 
is written, all TopologyEventListeners have received a TOPOLOGY_CHANGING - and 
only that guarantees that the syncToken mechanism carries a high guarantee.
    
    git-svn-id: 
https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/discovery/commons@1710175
 13f79535-47bb-0310-9956-ffa450edef68
---
 .../commons/providers/ViewStateManager.java        |  5 +-
 .../commons/providers/base/AsyncEventSender.java   | 33 ++++-----
 .../{AsyncEvent.java => AsyncTopologyEvent.java}   | 24 ++++++-
 .../providers/base/ViewStateManagerImpl.java       | 80 +++++++++++++++++-----
 .../commons/providers/base/TestHelper.java         |  2 +
 .../providers/base/TestViewStateManager.java       | 21 ++++--
 .../base/TestOakSyncTokenConsistencyService.java   |  5 +-
 7 files changed, 127 insertions(+), 43 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
 
b/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
index 8c71603..16a70c5 100644
--- 
a/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
+++ 
b/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
@@ -94,8 +94,9 @@ public interface ViewStateManager {
      * <p>
      * @param timeout time in millis to wait for at max - 0 to not wait at all 
- -1 
      * to wait indefinitely
-     * @return true if no more async events exist, false if the timeout hit 
early 
+     * @return 0 if no more async events exist, >0 the number of queued or 
in-flight (being sent)
+     * events if the timeout hit early
      */
-    boolean waitForAsyncEvents(long timeout);
+    int waitForAsyncEvents(long timeout);
 
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
 
b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
index 014f522..89b5b13 100644
--- 
a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
+++ 
b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
@@ -55,7 +55,12 @@ final class AsyncEventSender implements Runnable {
     
     /** Enqueues a particular event for asynchronous sending to a particular 
listener **/
     void enqueue(TopologyEventListener listener, TopologyEvent event) {
-        final AsyncEvent asyncEvent = new AsyncEvent(listener, event);
+        final AsyncTopologyEvent asyncEvent = new AsyncTopologyEvent(listener, 
event);
+        enqueue(asyncEvent);
+    }
+
+    /** Enqueues an AsyncEvent for later in-order execution **/
+    void enqueue(final AsyncEvent asyncEvent) {
         synchronized(eventQ) {
             eventQ.add(asyncEvent);
             if (logger.isDebugEnabled()) {
@@ -110,7 +115,7 @@ final class AsyncEventSender implements Runnable {
                         isSending = asyncEvent!=null;
                     }
                     if (asyncEvent!=null) {
-                        sendTopologyEvent(asyncEvent);
+                        asyncEvent.trigger();
                     }
                 } catch(Throwable th) {
                     // Even though we should never catch Error or 
RuntimeException
@@ -141,25 +146,21 @@ final class AsyncEventSender implements Runnable {
         }
     }
 
-    /** Actual sending of the asynchronous event - catches RuntimeExceptions a 
listener can send. (Error is caught outside) **/
-    private void sendTopologyEvent(AsyncEvent asyncEvent) {
-        logger.trace("sendTopologyEvent: start");
-        final TopologyEventListener listener = asyncEvent.listener;
-        final TopologyEvent event = asyncEvent.event;
-        try{
-            logger.debug("sendTopologyEvent: sending to listener: {}, event: 
{}", listener, event);
-            listener.handleTopologyEvent(event);
-        } catch(final Exception e) {
-            logger.warn("sendTopologyEvent: handler threw exception. handler: 
"+listener+", exception: "+e, e);
-        }
-        logger.trace("sendTopologyEvent: start: listener: {}, event: {}", 
listener, event);
-    }
-
     /** for testing only: checks whether there are any events being queued or 
sent **/
     boolean hasInFlightEvent() {
         synchronized(eventQ) {
             return isSending || !eventQ.isEmpty();
         }
     }
+
+    public int getInFlightEventCnt() {
+        synchronized(eventQ) {
+            int cnt = eventQ.size();
+            if (isSending) {
+                cnt++;
+            }
+            return cnt;
+        }
+    }
     
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEvent.java
 
b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncTopologyEvent.java
similarity index 61%
rename from 
src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEvent.java
rename to 
src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncTopologyEvent.java
index 1a73908..4d2a443 100644
--- 
a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEvent.java
+++ 
b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncTopologyEvent.java
@@ -20,12 +20,17 @@ package org.apache.sling.discovery.commons.providers.base;
 
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** SLING-4755 : encapsulates an event that yet has to be sent 
(asynchronously) for a particular listener **/
-final class AsyncEvent {
+final class AsyncTopologyEvent implements AsyncEvent {
+    
+    static final Logger logger = 
LoggerFactory.getLogger(AsyncTopologyEvent.class);
+
     final TopologyEventListener listener;
     final TopologyEvent event;
-    AsyncEvent(TopologyEventListener listener, TopologyEvent event) {
+    AsyncTopologyEvent(TopologyEventListener listener, TopologyEvent event) {
         if (listener==null) {
             throw new IllegalArgumentException("listener must not be null");
         }
@@ -37,6 +42,19 @@ final class AsyncEvent {
     }
     @Override
     public String toString() {
-        return "an AsyncEvent[event="+event+", listener="+listener+"]";
+        return "an AsyncTopologyEvent[event="+event+", listener="+listener+"]";
     }
+
+    /** Actual sending of the asynchronous event - catches RuntimeExceptions a 
listener can send. (Error is caught outside) **/
+    public void trigger() {
+        logger.trace("trigger: start");
+        try{
+            logger.debug("trigger: sending to listener: {}, event: {}", 
listener, event);
+            listener.handleTopologyEvent(event);
+        } catch(final Exception e) {
+            logger.warn("trigger: handler threw exception. handler: 
"+listener+", exception: "+e, e);
+        }
+        logger.trace("trigger: start: listener: {}, event: {}", listener, 
event);
+    }
+
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
 
b/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
index 9fa980b..61b39f6 100644
--- 
a/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
+++ 
b/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
@@ -512,26 +512,56 @@ public class ViewStateManagerImpl implements 
ViewStateManager {
                 // then:
                 // run the set consistencyService
                 final int lastModCnt = modCnt;
-                logger.info("handleNewViewNonDelayed: invoking 
consistencyService (modCnt={})", modCnt);
-                consistencyService.sync(newView,
-                        new Runnable() {
+                logger.info("handleNewViewNonDelayed: invoking 
waitForAsyncEvents, then consistencyService (modCnt={})", modCnt);
+                asyncEventSender.enqueue(new AsyncEvent() {
                     
-                    public void run() {
-                        logger.trace("consistencyService.callback.run: start. 
acquiring lock...");
+                    @Override
+                    public String toString() {
+                        return "the 
waitForAsyncEvents-flush-token-"+hashCode();
+                    }
+                    
+                    @Override
+                    public void trigger() {
+                        // when this event is triggered we're guaranteed to 
have 
+                        // no more async events - cos the async events are 
handled
+                        // in a queue and this AsyncEvent was put at the end 
of the
+                        // queue at enqueue time. So now e can go ahead.
+                        // the plus using such a token event is that others 
when
+                        // calling waitForAsyncEvent() will get blocked while 
this
+                        // 'token async event' is handled. Which is what we 
explicitly want.
                         lock.lock();
                         try{
-                            logger.debug("consistencyService.callback.run: 
lock aquired. (modCnt should be {}, is {})", lastModCnt, modCnt);
                             if (modCnt!=lastModCnt) {
-                                logger.info("consistencyService.callback.run: 
modCnt changed (from {} to {}) - ignoring",
+                                
logger.info("handleNewViewNonDelayed/waitForAsyncEvents.run: modCnt changed 
(from {} to {}) - ignoring",
                                         lastModCnt, modCnt);
                                 return;
                             }
-                            logger.info("consistencyService.callback.run: 
invoking doHandleConsistent.");
-                            // else:
-                            doHandleConsistent(newView);
+                            
logger.info("handleNewViewNonDelayed/waitForAsyncEvents.run: done, now invoking 
consistencyService (modCnt={})", modCnt);
+                            consistencyService.sync(newView,
+                                    new Runnable() {
+                                
+                                public void run() {
+                                    
logger.trace("consistencyService.callback.run: start. acquiring lock...");
+                                    lock.lock();
+                                    try{
+                                        
logger.debug("consistencyService.callback.run: lock aquired. (modCnt should be 
{}, is {})", lastModCnt, modCnt);
+                                        if (modCnt!=lastModCnt) {
+                                            
logger.info("consistencyService.callback.run: modCnt changed (from {} to {}) - 
ignoring",
+                                                    lastModCnt, modCnt);
+                                            return;
+                                        }
+                                        
logger.info("consistencyService.callback.run: invoking doHandleConsistent.");
+                                        // else:
+                                        doHandleConsistent(newView);
+                                    } finally {
+                                        lock.unlock();
+                                        
logger.trace("consistencyService.callback.run: end.");
+                                    }
+                                }
+                                
+                            });
                         } finally {
                             lock.unlock();
-                            logger.trace("consistencyService.callback.run: 
end.");
                         }
                     }
                     
@@ -628,22 +658,38 @@ public class ViewStateManagerImpl implements 
ViewStateManager {
     }
 
     @Override
-    public boolean waitForAsyncEvents(long timeout) {
+    public int waitForAsyncEvents(long timeout) {
         long end = System.currentTimeMillis() + timeout;
-        while(asyncEventSender.hasInFlightEvent() || 
-                (minEventDelayHandler!=null && 
minEventDelayHandler.isDelaying())) {
+        while(true) {
+            int inFlightEventCnt = getInFlightAsyncEventCnt();
+            if (inFlightEventCnt==0) {
+                // no in-flight events - return 0
+                return 0;
+            }
             if (timeout==0) {
-                return false;
+                // timeout is set to 'no-wait', but we have in-flight events,
+                // return the actual cnt
+                return inFlightEventCnt;
             }
-            if (timeout<0 || System.currentTimeMillis()<end) {
+            if (timeout<0 /*infinite waiting*/ || 
System.currentTimeMillis()<end) {
                 try {
                     Thread.sleep(50);
                 } catch (InterruptedException e) {
                     // ignore
                 }
+            } else {
+                // timeout hit
+                return inFlightEventCnt;
             }
         }
-        return true;
+    }
+    
+    private int getInFlightAsyncEventCnt() {
+        int cnt = asyncEventSender.getInFlightEventCnt();
+        if (minEventDelayHandler!=null && minEventDelayHandler.isDelaying()) {
+            cnt++;
+        }
+        return cnt;
     }
     
 }
diff --git 
a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
 
b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
index 23b6821..72ab0d2 100644
--- 
a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
+++ 
b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
@@ -19,6 +19,7 @@
 package org.apache.sling.discovery.commons.providers.base;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
@@ -134,6 +135,7 @@ public class TestHelper {
                 Thread.sleep(delayInMillis);
                 logger.debug("randomEventLoop: waiting "+delayInMillis+"ms 
done.");
             }
+            assertEquals(0, mgr.waitForAsyncEvents(500));
             if (!shouldCallChanging) {
                 // in that case I should still get a CHANGING - by contract
                 logger.debug("randomEventLoop: asserting CHANGING, CHANGED 
events were sent");
diff --git 
a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
 
b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
index 13d586d..e80268c 100644
--- 
a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
+++ 
b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
@@ -63,7 +63,9 @@ public class TestViewStateManager {
             try {
                 lock.unlock();
                 try{
+                    logger.info("ConsistencyServiceWithSemaphore.sync: 
acquiring lock ...");
                     semaphore.acquire();
+                    logger.info("ConsistencyServiceWithSemaphore.sync: lock 
acquired.");
                 } finally {
                     lock.lock();
                 }
@@ -102,6 +104,10 @@ public class TestViewStateManager {
     
     @After
     public void teardown() throws Exception {
+        if (mgr != null) {
+            // release any async event sender ..
+            mgr.handleDeactivated();
+        }
         mgr = null;
         defaultRandom= null;
     }
@@ -240,7 +246,7 @@ public class TestViewStateManager {
         mgr.handleChanging();
         final BaseTopologyView view = new DummyTopologyView().addInstance();
         mgr.handleNewView(view);
-        assertTrue(mgr.waitForAsyncEvents(1000));
+        assertEquals(0, mgr.waitForAsyncEvents(1000));
         TestHelper.assertNoEvents(listener);
         synchronized(syncCallbacks) {
             assertEquals(1, syncCallbacks.size());
@@ -249,14 +255,14 @@ public class TestViewStateManager {
         String id2 = UUID.randomUUID().toString();
         final BaseTopologyView view2 = TestHelper.newView(true, id1, id1, id1, 
id2); 
         mgr.handleNewView(view2);
-        assertTrue(mgr.waitForAsyncEvents(1000));
+        assertEquals(0, mgr.waitForAsyncEvents(1000));
         TestHelper.assertNoEvents(listener);
         synchronized(syncCallbacks) {
             assertEquals(1, syncCallbacks.size());
             syncCallbacks.get(0).run();
             syncCallbacks.clear();
         }
-        assertTrue(mgr.waitForAsyncEvents(1000));
+        assertEquals(0, mgr.waitForAsyncEvents(1000));
         assertEvents(listener, EventHelper.newInitEvent(view2));
     }
     
@@ -609,10 +615,12 @@ public class TestViewStateManager {
         });
         Thread.sleep(1000);
         TestHelper.assertNoEvents(listener);
+        assertEquals("should have one thread now waiting", 1, 
serviceSemaphore.getQueueLength());
         serviceSemaphore.release(1); // release the first one only
         Thread.sleep(1000);
         assertEvents(listener, EventHelper.newInitEvent(view1));
         mgr.handleChanging();
+        assertEquals(0, mgr.waitForAsyncEvents(500));
         assertEvents(listener, EventHelper.newChangingEvent(view1));
         async(new Runnable() {
 
@@ -625,6 +633,7 @@ public class TestViewStateManager {
         Thread.sleep(1000);
         logger.debug("run: asserting no events");
         TestHelper.assertNoEvents(listener);
+        assertEquals("should have one thread now waiting", 1, 
serviceSemaphore.getQueueLength());
         assertFalse("should not be locked", lock.isLocked());
 
         logger.debug("run: issuing a second event");
@@ -649,12 +658,16 @@ public class TestViewStateManager {
         });
         logger.debug("run: waiting 1sec");
         Thread.sleep(1000);
+        int remainingAsyncEvents = mgr.waitForAsyncEvents(2000);
+        logger.info("run: result of waitForAsyncEvent is: 
"+remainingAsyncEvents);
+        assertEquals("should have one thread now waiting", 1, 
serviceSemaphore.getQueueLength());
         assertEquals("should be acquiring (by thread2)", 1, 
testSemaphore.getQueueLength());
         // releasing the testSemaphore
         testSemaphore.release();
         logger.debug("run: waiting 1sec");
         Thread.sleep(1000);
-        assertEquals("should have both threads now waiting", 2, 
serviceSemaphore.getQueueLength());
+        assertEquals("should have two async events now in the queue or being 
sent", 2, mgr.waitForAsyncEvents(500));
+        assertEquals("but should only have 1 thread actually sitting on the 
semaphore waiting", 1, serviceSemaphore.getQueueLength());
         logger.debug("run: releasing consistencyService");
         serviceSemaphore.release(1); // release the first one only
         logger.debug("run: waiting 1sec");
diff --git 
a/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
 
b/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
index a774d76..357c8a5 100644
--- 
a/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
+++ 
b/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
@@ -136,7 +136,7 @@ public class TestOakSyncTokenConsistencyService {
         DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new 
DiscoveryLiteDescriptorBuilder().me(1).seq(1).activeIds(1).setFinal(true));
         assertTrue(idMapService1.waitForInit(2000));
         cs.triggerBackgroundCheck();
-        assertTrue(vsm.waitForAsyncEvents(1000));
+        assertEquals(0, vsm.waitForAsyncEvents(1000));
         assertEquals(1, l.countEvents());
     }
     
@@ -180,7 +180,10 @@ public class TestOakSyncTokenConsistencyService {
         DummyTopologyView two2 = 
TestHelper.newView(two1.getLocalClusterSyncTokenId(), 
two1.getLocalInstance().getClusterView().getId(), true, slingId1, slingId1, 
slingId1, slingId2);
         vsm2.handleNewView(two2);
         cs1.triggerBackgroundCheck();
+        cs1.triggerBackgroundCheck();
+        cs2.triggerBackgroundCheck();
         cs2.triggerBackgroundCheck();
+        assertEquals(0, vsm1.waitForAsyncEvents(500));
         assertEquals(1, l.countEvents());
         DummyTopologyView oneLeaving = two1.clone();
         oneLeaving.removeInstance(slingId2);

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to