Author: fmeschbe
Date: Fri Jan 13 16:42:12 2012
New Revision: 1231167

URL: http://svn.apache.org/viewvc?rev=1231167&view=rev
Log:
SLING-2365 Asynchronously post OSGi events. This also is for extracting more 
information from the resource underlying the event (with the added benefit of 
ensuring single-threaded access to the resource resolver).

Modified:
    
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java

Modified: 
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java?rev=1231167&r1=1231166&r2=1231167&view=diff
==============================================================================
--- 
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
 (original)
+++ 
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
 Fri Jan 13 16:42:12 2012
@@ -25,6 +25,7 @@ import java.util.Hashtable;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.Node;
 import javax.jcr.RepositoryException;
@@ -42,6 +43,7 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.jcr.resource.JcrResourceConstants;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,6 +80,20 @@ public class JcrResourceListener impleme
     private final boolean hasJackrabbitEventClass;
 
     /**
+     * A queue of OSGi Events created by
+     * {@link #sendOsgiEvent(String, Event, String, EventAdmin, 
ChangedAttributes)}
+     * waiting for actual dispatching to the OSGi Event Admin in
+     * {@link #processOsgiEventQueue()}
+     */
+    private final LinkedBlockingQueue<Dictionary<String, Object>> 
osgiEventQueue;
+
+    /**
+     * Marker event for {@link #processOsgiEventQueue()} to be signaled to
+     * terminate processing Events.
+     */
+    private final Dictionary<String, Object> TERMINATE_PROCESSING = new 
Hashtable<String, Object>(1);
+
+    /**
      * Constructor.
      * @param workspaceName The workspace name to observe
      * @param factory    The resource resolver factory.
@@ -103,6 +119,15 @@ public class JcrResourceListener impleme
         this.startPath = startPath;
         this.eventAdminTracker = eventAdminTracker;
         this.mountPrefix = (mountPrefix.equals("/") ? null : mountPrefix);
+
+        this.osgiEventQueue = new 
LinkedBlockingQueue<Dictionary<String,Object>>();
+        Thread oeqt = new Thread(new Runnable() {
+            public void run() {
+                processOsgiEventQueue();
+            }
+        }, "JCR Resource Event Queue Processor");
+        oeqt.start();
+
         
this.session.getWorkspace().getObservationManager().addEventListener(this,
             
Event.NODE_ADDED|Event.NODE_REMOVED|Event.PROPERTY_ADDED|Event.PROPERTY_CHANGED|Event.PROPERTY_REMOVED,
             this.startPath, true, null, null, false);
@@ -120,12 +145,18 @@ public class JcrResourceListener impleme
      * Dispose this listener.
      */
     public void dispose() {
+
+        // unregister from observations
         try {
             
this.session.getWorkspace().getObservationManager().removeEventListener(this);
         } catch (RepositoryException e) {
             logger.warn("Unable to remove session listener: " + this, e);
         }
         this.resolver.close();
+
+        // drop any remaining OSGi Events not processed yet
+        this.osgiEventQueue.clear();
+        this.osgiEventQueue.offer(TERMINATE_PROCESSING);
     }
 
     /**
@@ -182,18 +213,18 @@ public class JcrResourceListener impleme
 
         for (final Entry<String, Event> e : removedEvents.entrySet()) {
             // Launch an OSGi event
-            sendOsgiEvent(e.getKey(), e.getValue(), 
SlingConstants.TOPIC_RESOURCE_REMOVED, localEA, null);
+            sendOsgiEvent(e.getKey(), e.getValue(), 
SlingConstants.TOPIC_RESOURCE_REMOVED, null);
         }
 
         for (final Entry<String, Event> e : addedEvents.entrySet()) {
             // Launch an OSGi event.
-            sendOsgiEvent(e.getKey(), e.getValue(), 
SlingConstants.TOPIC_RESOURCE_ADDED, localEA, null);
+            sendOsgiEvent(e.getKey(), e.getValue(), 
SlingConstants.TOPIC_RESOURCE_ADDED, null);
         }
 
         // Send the changed events.
         for (final Entry<String, ChangedAttributes> e : 
changedEvents.entrySet()) {
             // Launch an OSGi event.
-            sendOsgiEvent(e.getKey(), e.getValue().firstEvent, 
SlingConstants.TOPIC_RESOURCE_CHANGED, localEA, e.getValue());
+            sendOsgiEvent(e.getKey(), e.getValue().firstEvent, 
SlingConstants.TOPIC_RESOURCE_CHANGED, e.getValue());
         }
     }
 
@@ -263,63 +294,98 @@ public class JcrResourceListener impleme
      * @param path The path too the node where the event occurred.
      * @param event The JCR observation event.
      * @param topic The topic that should be used for the OSGi event.
-     * @param localEA The OSGi Event Admin that can be used to post events.
      */
-    private void sendOsgiEvent(String path, final Event event, final String 
topic, final EventAdmin localEA,
+    private void sendOsgiEvent(String path, final Event event, final String 
topic,
             final ChangedAttributes changedAttributes) {
 
         path = createWorkspacePath(path);
 
         final Dictionary<String, Object> properties = new Hashtable<String, 
Object>();
         properties.put(SlingConstants.PROPERTY_USERID, event.getUserID());
-        if ( this.isExternal(event) ) {
+        if (this.isExternal(event)) {
             properties.put("event.application", "unknown");
         }
-        if ( changedAttributes != null ) {
+        if (changedAttributes != null) {
             changedAttributes.addProperties(properties);
         }
 
-        if (!SlingConstants.TOPIC_RESOURCE_REMOVED.equals(topic)) {
-            Resource resource = this.resolver.getResource(path);
-            if (resource != null) {
-                // check for nt:file nodes
-                if (path.endsWith("/jcr:content")) {
-                    final Node node = resource.adaptTo(Node.class);
-                    if (node != null) {
-                        try {
-                            if (node.getParent().isNodeType("nt:file")) {
-                                @SuppressWarnings("deprecation")
-                                final Resource parentResource = 
ResourceUtil.getParent(resource);
-                                if (parentResource != null) {
-                                    resource = parentResource;
-                                    path = resource.getPath();
+        // set the path (might have been changed for nt:file content)
+        properties.put(SlingConstants.PROPERTY_PATH, path);
+        properties.put(EventConstants.EVENT_TOPIC, topic);
+
+        // enqueue event for dispatching
+        this.osgiEventQueue.offer(properties);
+    }
+
+    /**
+     * Called by the Runnable.run method of the JCR Event Queue processor to
+     * process the {@link #osgiEventQueue} until the
+     * {@link #TERMINATE_PROCESSING} event is received.
+     */
+    void processOsgiEventQueue() {
+        while (true) {
+            final Dictionary<String, Object> event;
+            try {
+                event = this.osgiEventQueue.take();
+            } catch (InterruptedException e) {
+                // interrupted waiting for the event; keep on waiting
+                continue;
+            }
+
+            if (event == null || event == TERMINATE_PROCESSING) {
+                break;
+            }
+
+            try {
+                final EventAdmin localEa = (EventAdmin) 
this.eventAdminTracker.getService();
+                if (localEa != null) {
+                    final String topic = (String) 
event.remove(EventConstants.EVENT_TOPIC);
+                    if (!SlingConstants.TOPIC_RESOURCE_REMOVED.equals(topic)) {
+                        final String path = (String) 
event.get(SlingConstants.PROPERTY_PATH);
+                        Resource resource = this.resolver.getResource(path);
+                        if (resource != null) {
+                            // check for nt:file nodes
+                            if (path.endsWith("/jcr:content")) {
+                                final Node node = resource.adaptTo(Node.class);
+                                if (node != null) {
+                                    try {
+                                        if 
(node.getParent().isNodeType("nt:file")) {
+                                            @SuppressWarnings("deprecation")
+                                            final Resource parentResource = 
ResourceUtil.getParent(resource);
+                                            if (parentResource != null) {
+                                                resource = parentResource;
+                                                
event.put(SlingConstants.PROPERTY_PATH, resource.getPath());
+                                            }
+                                        }
+                                    } catch (RepositoryException re) {
+                                        // ignore this
+                                    }
                                 }
                             }
-                        } catch (RepositoryException re) {
-                            // ignore this
+
+                            final String resourceType = 
resource.getResourceType();
+                            if (resourceType != null) {
+                                
event.put(SlingConstants.PROPERTY_RESOURCE_TYPE, resource.getResourceType());
+                            }
+                            final String resourceSuperType = 
resource.getResourceSuperType();
+                            if (resourceSuperType != null) {
+                                
event.put(SlingConstants.PROPERTY_RESOURCE_SUPER_TYPE, 
resource.getResourceSuperType());
+                            }
+                        } else {
+                            logger.error(
+                                "processOsgiEventQueue: Resource at {} not 
found, which is not expected for an added or modified node",
+                                path);
                         }
                     }
-                }
 
-                final String resourceType = resource.getResourceType();
-                if (resourceType != null) {
-                    properties.put(SlingConstants.PROPERTY_RESOURCE_TYPE, 
resource.getResourceType());
+                    localEa.postEvent(new org.osgi.service.event.Event(topic, 
event));
                 }
-                final String resourceSuperType = 
resource.getResourceSuperType();
-                if (resourceSuperType != null) {
-                    
properties.put(SlingConstants.PROPERTY_RESOURCE_SUPER_TYPE, 
resource.getResourceSuperType());
-                }
-            } else {
-                logger.error(
-                    "sendOsgiEvent: Resource at {} not found, which is not 
expected for an added or modified node",
-                    path);
+            } catch (Exception e) {
+                logger.warn("processOsgiEventQueue: Unexpected problem 
processing event " + event, e);
             }
         }
 
-        // set the path (might have been changed for nt:file content)
-        properties.put(SlingConstants.PROPERTY_PATH, path);
-
-        localEA.postEvent(new org.osgi.service.event.Event(topic, properties));
+        this.osgiEventQueue.clear();
     }
 
     private boolean isExternal(final Event event) {


Reply via email to