Author: fmeschbe
Date: Fri Jan 13 16:42:12 2012
New Revision: 1231167
URL: http://svn.apache.org/viewvc?rev=1231167&view=rev
Log:
SLING-2365 Asynchronously post OSGi events. This also is for extracting more
information from the resource underlying the event (with the added benefit of
ensuring single-threaded access to the resource resolver).
Modified:
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
Modified:
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java?rev=1231167&r1=1231166&r2=1231167&view=diff
==============================================================================
---
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
(original)
+++
sling/trunk/bundles/jcr/resource/src/main/java/org/apache/sling/jcr/resource/internal/JcrResourceListener.java
Fri Jan 13 16:42:12 2012
@@ -25,6 +25,7 @@ import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
@@ -42,6 +43,7 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.jcr.resource.JcrResourceConstants;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +80,20 @@ public class JcrResourceListener impleme
private final boolean hasJackrabbitEventClass;
/**
+ * A queue of OSGi Events created by
+ * {@link #sendOsgiEvent(String, Event, String, EventAdmin,
ChangedAttributes)}
+ * waiting for actual dispatching to the OSGi Event Admin in
+ * {@link #processOsgiEventQueue()}
+ */
+ private final LinkedBlockingQueue<Dictionary<String, Object>>
osgiEventQueue;
+
+ /**
+ * Marker event for {@link #processOsgiEventQueue()} to be signaled to
+ * terminate processing Events.
+ */
+ private final Dictionary<String, Object> TERMINATE_PROCESSING = new
Hashtable<String, Object>(1);
+
+ /**
* Constructor.
* @param workspaceName The workspace name to observe
* @param factory The resource resolver factory.
@@ -103,6 +119,15 @@ public class JcrResourceListener impleme
this.startPath = startPath;
this.eventAdminTracker = eventAdminTracker;
this.mountPrefix = (mountPrefix.equals("/") ? null : mountPrefix);
+
+ this.osgiEventQueue = new
LinkedBlockingQueue<Dictionary<String,Object>>();
+ Thread oeqt = new Thread(new Runnable() {
+ public void run() {
+ processOsgiEventQueue();
+ }
+ }, "JCR Resource Event Queue Processor");
+ oeqt.start();
+
this.session.getWorkspace().getObservationManager().addEventListener(this,
Event.NODE_ADDED|Event.NODE_REMOVED|Event.PROPERTY_ADDED|Event.PROPERTY_CHANGED|Event.PROPERTY_REMOVED,
this.startPath, true, null, null, false);
@@ -120,12 +145,18 @@ public class JcrResourceListener impleme
* Dispose this listener.
*/
public void dispose() {
+
+ // unregister from observations
try {
this.session.getWorkspace().getObservationManager().removeEventListener(this);
} catch (RepositoryException e) {
logger.warn("Unable to remove session listener: " + this, e);
}
this.resolver.close();
+
+ // drop any remaining OSGi Events not processed yet
+ this.osgiEventQueue.clear();
+ this.osgiEventQueue.offer(TERMINATE_PROCESSING);
}
/**
@@ -182,18 +213,18 @@ public class JcrResourceListener impleme
for (final Entry<String, Event> e : removedEvents.entrySet()) {
// Launch an OSGi event
- sendOsgiEvent(e.getKey(), e.getValue(),
SlingConstants.TOPIC_RESOURCE_REMOVED, localEA, null);
+ sendOsgiEvent(e.getKey(), e.getValue(),
SlingConstants.TOPIC_RESOURCE_REMOVED, null);
}
for (final Entry<String, Event> e : addedEvents.entrySet()) {
// Launch an OSGi event.
- sendOsgiEvent(e.getKey(), e.getValue(),
SlingConstants.TOPIC_RESOURCE_ADDED, localEA, null);
+ sendOsgiEvent(e.getKey(), e.getValue(),
SlingConstants.TOPIC_RESOURCE_ADDED, null);
}
// Send the changed events.
for (final Entry<String, ChangedAttributes> e :
changedEvents.entrySet()) {
// Launch an OSGi event.
- sendOsgiEvent(e.getKey(), e.getValue().firstEvent,
SlingConstants.TOPIC_RESOURCE_CHANGED, localEA, e.getValue());
+ sendOsgiEvent(e.getKey(), e.getValue().firstEvent,
SlingConstants.TOPIC_RESOURCE_CHANGED, e.getValue());
}
}
@@ -263,63 +294,98 @@ public class JcrResourceListener impleme
* @param path The path too the node where the event occurred.
* @param event The JCR observation event.
* @param topic The topic that should be used for the OSGi event.
- * @param localEA The OSGi Event Admin that can be used to post events.
*/
- private void sendOsgiEvent(String path, final Event event, final String
topic, final EventAdmin localEA,
+ private void sendOsgiEvent(String path, final Event event, final String
topic,
final ChangedAttributes changedAttributes) {
path = createWorkspacePath(path);
final Dictionary<String, Object> properties = new Hashtable<String,
Object>();
properties.put(SlingConstants.PROPERTY_USERID, event.getUserID());
- if ( this.isExternal(event) ) {
+ if (this.isExternal(event)) {
properties.put("event.application", "unknown");
}
- if ( changedAttributes != null ) {
+ if (changedAttributes != null) {
changedAttributes.addProperties(properties);
}
- if (!SlingConstants.TOPIC_RESOURCE_REMOVED.equals(topic)) {
- Resource resource = this.resolver.getResource(path);
- if (resource != null) {
- // check for nt:file nodes
- if (path.endsWith("/jcr:content")) {
- final Node node = resource.adaptTo(Node.class);
- if (node != null) {
- try {
- if (node.getParent().isNodeType("nt:file")) {
- @SuppressWarnings("deprecation")
- final Resource parentResource =
ResourceUtil.getParent(resource);
- if (parentResource != null) {
- resource = parentResource;
- path = resource.getPath();
+ // set the path (might have been changed for nt:file content)
+ properties.put(SlingConstants.PROPERTY_PATH, path);
+ properties.put(EventConstants.EVENT_TOPIC, topic);
+
+ // enqueue event for dispatching
+ this.osgiEventQueue.offer(properties);
+ }
+
+ /**
+ * Called by the Runnable.run method of the JCR Event Queue processor to
+ * process the {@link #osgiEventQueue} until the
+ * {@link #TERMINATE_PROCESSING} event is received.
+ */
+ void processOsgiEventQueue() {
+ while (true) {
+ final Dictionary<String, Object> event;
+ try {
+ event = this.osgiEventQueue.take();
+ } catch (InterruptedException e) {
+ // interrupted waiting for the event; keep on waiting
+ continue;
+ }
+
+ if (event == null || event == TERMINATE_PROCESSING) {
+ break;
+ }
+
+ try {
+ final EventAdmin localEa = (EventAdmin)
this.eventAdminTracker.getService();
+ if (localEa != null) {
+ final String topic = (String)
event.remove(EventConstants.EVENT_TOPIC);
+ if (!SlingConstants.TOPIC_RESOURCE_REMOVED.equals(topic)) {
+ final String path = (String)
event.get(SlingConstants.PROPERTY_PATH);
+ Resource resource = this.resolver.getResource(path);
+ if (resource != null) {
+ // check for nt:file nodes
+ if (path.endsWith("/jcr:content")) {
+ final Node node = resource.adaptTo(Node.class);
+ if (node != null) {
+ try {
+ if
(node.getParent().isNodeType("nt:file")) {
+ @SuppressWarnings("deprecation")
+ final Resource parentResource =
ResourceUtil.getParent(resource);
+ if (parentResource != null) {
+ resource = parentResource;
+
event.put(SlingConstants.PROPERTY_PATH, resource.getPath());
+ }
+ }
+ } catch (RepositoryException re) {
+ // ignore this
+ }
}
}
- } catch (RepositoryException re) {
- // ignore this
+
+ final String resourceType =
resource.getResourceType();
+ if (resourceType != null) {
+
event.put(SlingConstants.PROPERTY_RESOURCE_TYPE, resource.getResourceType());
+ }
+ final String resourceSuperType =
resource.getResourceSuperType();
+ if (resourceSuperType != null) {
+
event.put(SlingConstants.PROPERTY_RESOURCE_SUPER_TYPE,
resource.getResourceSuperType());
+ }
+ } else {
+ logger.error(
+ "processOsgiEventQueue: Resource at {} not
found, which is not expected for an added or modified node",
+ path);
}
}
- }
- final String resourceType = resource.getResourceType();
- if (resourceType != null) {
- properties.put(SlingConstants.PROPERTY_RESOURCE_TYPE,
resource.getResourceType());
+ localEa.postEvent(new org.osgi.service.event.Event(topic,
event));
}
- final String resourceSuperType =
resource.getResourceSuperType();
- if (resourceSuperType != null) {
-
properties.put(SlingConstants.PROPERTY_RESOURCE_SUPER_TYPE,
resource.getResourceSuperType());
- }
- } else {
- logger.error(
- "sendOsgiEvent: Resource at {} not found, which is not
expected for an added or modified node",
- path);
+ } catch (Exception e) {
+ logger.warn("processOsgiEventQueue: Unexpected problem
processing event " + event, e);
}
}
- // set the path (might have been changed for nt:file content)
- properties.put(SlingConstants.PROPERTY_PATH, path);
-
- localEA.postEvent(new org.osgi.service.event.Event(topic, properties));
+ this.osgiEventQueue.clear();
}
private boolean isExternal(final Event event) {